/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.mapred;

import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.Module;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.PrintStream;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSError;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.mapred.TezContainerLogAppender;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.SecretManager;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.ExitUtil;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.log4j.Appender;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.tez.common.ContainerContext;
import org.apache.tez.common.ContainerTask;
import org.apache.tez.common.TezTaskUmbilicalProtocol;
import org.apache.tez.common.TezUtils;
import org.apache.tez.common.counters.Limits;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezVertexID;
import org.apache.tez.runtime.LogicalIOProcessorRuntimeTask;
import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.events.TaskAttemptCompletedEvent;
import org.apache.tez.runtime.api.events.TaskAttemptFailedEvent;
import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent;
import org.apache.tez.runtime.api.impl.EventMetaData;
import org.apache.tez.runtime.api.impl.TaskSpec;
import org.apache.tez.runtime.api.impl.TezEvent;
import org.apache.tez.runtime.api.impl.TezHeartbeatRequest;
import org.apache.tez.runtime.api.impl.TezHeartbeatResponse;
import org.apache.tez.runtime.api.impl.TezUmbilical;
import org.apache.tez.runtime.common.objectregistry.ObjectLifeCycle;
import org.apache.tez.runtime.common.objectregistry.ObjectRegistry;
import org.apache.tez.runtime.common.objectregistry.ObjectRegistryImpl;
import org.apache.tez.runtime.common.objectregistry.ObjectRegistryModule;
import org.apache.tez.runtime.library.common.security.TokenCache;
import org.apache.tez.runtime.library.shuffle.common.ShuffleUtils;

/*
 * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
 */
public class YarnTezDagChild {
    private static final Logger LOG = Logger.getLogger(YarnTezDagChild.class);
    private static AtomicBoolean stopped = new AtomicBoolean(false);
    private static String containerIdStr;
    private static int maxEventsToGet;
    private static LinkedBlockingQueue<TezEvent> eventsToSend;
    private static AtomicLong requestCounter;
    private static TezTaskAttemptID currentTaskAttemptID;
    private static long amPollInterval;
    private static TezTaskUmbilicalProtocol umbilical;
    private static ReentrantReadWriteLock taskLock;
    private static LogicalIOProcessorRuntimeTask currentTask;
    private static AtomicBoolean heartbeatError;
    private static Throwable heartbeatErrorException;

    private static Thread startHeartbeatThread() {
        Thread heartbeatThread = new Thread(new Runnable(){

            public void run() {
                while (!stopped.get() && !heartbeatError.get()) {
                    try {
                        Thread.sleep(amPollInterval);
                        try {
                            if (YarnTezDagChild.heartbeat()) continue;
                        }
                        catch (SecretManager.InvalidToken e) {
                            LOG.error((Object)"Heartbeat error in authenticating with AM: ", (Throwable)e);
                            heartbeatErrorException = e;
                            heartbeatError.set(true);
                        }
                        catch (Throwable e) {
                            LOG.error((Object)"Heartbeat error in communicating with AM. ", e);
                            heartbeatErrorException = e;
                            heartbeatError.set(true);
                        }
                        break;
                    }
                    catch (InterruptedException e) {
                        LOG.info((Object)("Heartbeat thread interrupted.  stopped: " + stopped.get() + " error: " + heartbeatError.get()));
                    }
                }
                if (!stopped.get()) {
                    if (heartbeatErrorException != null) {
                        ExitUtil.terminate((int)-1, (Throwable)heartbeatErrorException);
                    } else {
                        ExitUtil.terminate((int)-1, (String)"Exiting Tez Child Process");
                    }
                }
            }
        });
        heartbeatThread.setName("Tez Container Heartbeat Thread [" + containerIdStr + "]");
        heartbeatThread.setDaemon(true);
        heartbeatThread.start();
        return heartbeatThread;
    }

    private static synchronized boolean heartbeat() throws TezException, IOException {
        return YarnTezDagChild.heartbeat(null);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private static synchronized boolean heartbeat(Collection<TezEvent> outOfBandEvents) throws TezException, IOException {
        TezEvent updateEvent = null;
        int eventCounter = 0;
        int eventsRange = 0;
        TezTaskAttemptID taskAttemptID = null;
        ArrayList<TezEvent> events = new ArrayList<TezEvent>();
        try {
            taskLock.readLock().lock();
            if (currentTask != null) {
                eventsToSend.drainTo(events);
                taskAttemptID = currentTaskAttemptID;
                eventCounter = currentTask.getEventCounter();
                eventsRange = maxEventsToGet;
                if (!currentTask.isTaskDone() && !currentTask.hadFatalError()) {
                    updateEvent = new TezEvent((Event)new TaskStatusUpdateEvent(currentTask.getCounters(), currentTask.getProgress()), new EventMetaData(EventMetaData.EventProducerConsumerType.SYSTEM, currentTask.getVertexName(), "", taskAttemptID));
                    events.add(updateEvent);
                } else if (outOfBandEvents == null && events.isEmpty()) {
                    LOG.info((Object)"Setting TaskAttemptID to null as the task has already completed. Caused by race-condition between the normal heartbeat and out-of-band heartbeats");
                    taskAttemptID = null;
                } else if (outOfBandEvents != null && !outOfBandEvents.isEmpty()) {
                    events.addAll(outOfBandEvents);
                }
            }
        }
        finally {
            taskLock.readLock().unlock();
        }
        long reqId = requestCounter.incrementAndGet();
        TezHeartbeatRequest request = new TezHeartbeatRequest(reqId, events, containerIdStr, taskAttemptID, eventCounter, eventsRange);
        if (LOG.isDebugEnabled()) {
            LOG.debug((Object)("Sending heartbeat to AM, request=" + request.toString()));
        }
        TezHeartbeatResponse response = umbilical.heartbeat(request);
        if (LOG.isDebugEnabled()) {
            LOG.debug((Object)("Received heartbeat response from AM, response=" + response));
        }
        if (response.shouldDie()) {
            LOG.info((Object)"Received should die response from AM");
            return false;
        }
        if (response.getLastRequestId() != reqId) {
            throw new TezException("AM and Task out of sync, responseReqId=" + response.getLastRequestId() + ", expectedReqId=" + reqId);
        }
        try {
            taskLock.readLock().lock();
            if (taskAttemptID == null || !taskAttemptID.equals((Object)currentTaskAttemptID)) {
                if (response.getEvents() != null && !response.getEvents().isEmpty()) {
                    LOG.warn((Object)("No current assigned task, ignoring all events in heartbeat response, eventCount=" + response.getEvents().size()));
                }
                boolean bl = true;
                return bl;
            }
            if (currentTask != null && response.getEvents() != null) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug((Object)("Routing events from heartbeat response to task, currentTaskAttemptId=" + currentTaskAttemptID + ", eventCount=" + response.getEvents().size()));
                }
                currentTask.handleEvents((Collection)response.getEvents());
            }
        }
        finally {
            taskLock.readLock().unlock();
        }
        return true;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static void main(String[] args) throws Throwable {
        Thread.setDefaultUncaughtExceptionHandler((Thread.UncaughtExceptionHandler)new YarnUncaughtExceptionHandler());
        LOG.info((Object)"YarnTezDagChild starting");
        final Configuration defaultConf = new Configuration();
        TezUtils.addUserSpecifiedTezConfiguration((Configuration)defaultConf);
        UserGroupInformation.setConfiguration((Configuration)defaultConf);
        Limits.setConfiguration((Configuration)defaultConf);
        assert (args.length == 5);
        String host = args[0];
        int port = Integer.parseInt(args[1]);
        final InetSocketAddress address = NetUtils.createSocketAddrForHost((String)host, (int)port);
        String containerIdentifier = args[2];
        String tokenIdentifier = args[3];
        int attemptNumber = Integer.parseInt(args[4]);
        if (LOG.isDebugEnabled()) {
            LOG.info((Object)("Info from cmd line: AM-host: " + host + " AM-port: " + port + " containerIdentifier: " + containerIdentifier + " attemptNumber: " + attemptNumber + " tokenIdentifier: " + tokenIdentifier));
        }
        DefaultMetricsSystem.initialize((String)"VertexTask");
        containerIdStr = containerIdentifier;
        ObjectRegistryImpl objectRegistry = new ObjectRegistryImpl();
        Injector injector = Guice.createInjector((Module[])new Module[]{new ObjectRegistryModule((ObjectRegistry)objectRegistry)});
        Credentials credentials = UserGroupInformation.getCurrentUser().getCredentials();
        if (LOG.isDebugEnabled()) {
            LOG.info((Object)"Executing with tokens:");
            for (Token token : credentials.getAllTokens()) {
                LOG.info((Object)token);
            }
        }
        amPollInterval = defaultConf.getLong("tez.task.am.heartbeat.interval-ms.max", 100L);
        maxEventsToGet = defaultConf.getInt("tez.task.max-events-per-heartbeat.max", 100);
        UserGroupInformation taskOwner = UserGroupInformation.createRemoteUser((String)tokenIdentifier);
        Token jobToken = TokenCache.getJobToken((Credentials)credentials);
        SecurityUtil.setTokenService((Token)jobToken, (InetSocketAddress)address);
        taskOwner.addToken(jobToken);
        HashMap<String, ByteBuffer> serviceConsumerMetadata = new HashMap<String, ByteBuffer>();
        serviceConsumerMetadata.put(ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID, ShuffleUtils.convertJobTokenToBytes((Token)jobToken));
        umbilical = (TezTaskUmbilicalProtocol)taskOwner.doAs((PrivilegedExceptionAction)new PrivilegedExceptionAction<TezTaskUmbilicalProtocol>(){

            @Override
            public TezTaskUmbilicalProtocol run() throws Exception {
                return (TezTaskUmbilicalProtocol)RPC.getProxy(TezTaskUmbilicalProtocol.class, (long)19L, (InetSocketAddress)address, (Configuration)defaultConf);
            }
        });
        final Thread heartbeatThread = YarnTezDagChild.startHeartbeatThread();
        TezUmbilical tezUmbilical = new TezUmbilical(){

            public void addEvents(Collection<TezEvent> events) {
                eventsToSend.addAll(events);
            }

            public void signalFatalError(TezTaskAttemptID taskAttemptID, String diagnostics, EventMetaData sourceInfo) {
                TezEvent taskAttemptFailedEvent = new TezEvent((Event)new TaskAttemptFailedEvent(diagnostics), sourceInfo);
                try {
                    YarnTezDagChild.heartbeat(Collections.singletonList(taskAttemptFailedEvent));
                }
                catch (Throwable t) {
                    LOG.fatal((Object)"Failed to communicate task attempt failure to AM via umbilical", t);
                    heartbeatErrorException = t;
                    heartbeatError.set(true);
                    heartbeatThread.interrupt();
                }
            }

            public boolean canCommit(TezTaskAttemptID taskAttemptID) throws IOException {
                return umbilical.canCommit(taskAttemptID);
            }
        };
        String pid = System.getenv().get("JVM_PID");
        LOG.info((Object)("PID, containerIdentifier: " + pid + ", " + containerIdentifier));
        ContainerTask containerTask = null;
        UserGroupInformation childUGI = null;
        ContainerContext containerContext = new ContainerContext(containerIdentifier, pid);
        int getTaskMaxSleepTime = defaultConf.getInt("tez.task.get-task.sleep.interval-ms.max", 200);
        int taskCount = 0;
        TezVertexID lastVertexId = null;
        EventMetaData currentSourceInfo = null;
        try {
            do {
                EventMetaData sourceInfo;
                if (taskCount > 0) {
                    YarnTezDagChild.updateLoggers(null);
                }
                boolean isNewGetTask = true;
                long getTaskPollStartTime = System.currentTimeMillis();
                long nextGetTaskPrintTime = getTaskPollStartTime + 2000L;
                int idle = 0;
                while (null == containerTask) {
                    if (!isNewGetTask) {
                        long sleepTimeMilliSecs = Math.min(idle * 10, getTaskMaxSleepTime);
                        if (sleepTimeMilliSecs + System.currentTimeMillis() > nextGetTaskPrintTime) {
                            LOG.info((Object)("Sleeping for " + sleepTimeMilliSecs + "ms before retrying getTask again. Got null now. " + "Next getTask sleep message after 2s"));
                            nextGetTaskPrintTime = System.currentTimeMillis() + sleepTimeMilliSecs + 2000L;
                        }
                        TimeUnit.MILLISECONDS.sleep(sleepTimeMilliSecs);
                    } else {
                        LOG.info((Object)"Attempting to fetch new task");
                    }
                    isNewGetTask = false;
                    containerTask = umbilical.getTask(containerContext);
                    ++idle;
                }
                LOG.info((Object)("Got TaskUpdate: " + (System.currentTimeMillis() - getTaskPollStartTime) + " ms after starting to poll." + " TaskInfo: shouldDie: " + containerTask.shouldDie() + (containerTask.shouldDie() ? "" : ", currentTaskAttemptId: " + containerTask.getTaskSpec().getTaskAttemptID())));
                if (containerTask.shouldDie()) {
                    return;
                }
                ++taskCount;
                TaskSpec taskSpec = containerTask.getTaskSpec();
                if (LOG.isDebugEnabled()) {
                    LOG.debug((Object)("New container task context:" + taskSpec.toString()));
                }
                try {
                    taskLock.writeLock().lock();
                    currentTaskAttemptID = taskSpec.getTaskAttemptID();
                    TezVertexID newVertexId = currentTaskAttemptID.getTaskID().getVertexID();
                    if (lastVertexId != null) {
                        if (!lastVertexId.equals((Object)newVertexId)) {
                            objectRegistry.clearCache(ObjectLifeCycle.VERTEX);
                        }
                        if (!lastVertexId.getDAGId().equals((Object)newVertexId.getDAGId())) {
                            objectRegistry.clearCache(ObjectLifeCycle.DAG);
                        }
                    }
                    lastVertexId = newVertexId;
                    YarnTezDagChild.updateLoggers(currentTaskAttemptID);
                    currentTask = YarnTezDagChild.createLogicalTask(attemptNumber, taskSpec, defaultConf, tezUmbilical, serviceConsumerMetadata);
                }
                finally {
                    taskLock.writeLock().unlock();
                }
                currentSourceInfo = sourceInfo = new EventMetaData(EventMetaData.EventProducerConsumerType.SYSTEM, taskSpec.getVertexName(), "", currentTaskAttemptID);
                childUGI = UserGroupInformation.createRemoteUser((String)System.getenv(ApplicationConstants.Environment.USER.toString()));
                childUGI.addCredentials(credentials);
                childUGI.doAs((PrivilegedExceptionAction)new PrivilegedExceptionAction<Object>(){

                    /*
                     * WARNING - Removed try catching itself - possible behaviour change.
                     */
                    @Override
                    public Object run() throws Exception {
                        try {
                            LOG.info((Object)("Initializing task, taskAttemptId=" + currentTaskAttemptID));
                            currentTask.initialize();
                            if (!currentTask.hadFatalError()) {
                                LOG.info((Object)("Running task, taskAttemptId=" + currentTaskAttemptID));
                                currentTask.run();
                                LOG.info((Object)("Closing task, taskAttemptId=" + currentTaskAttemptID));
                                currentTask.close();
                            }
                            LOG.info((Object)("Task completed, taskAttemptId=" + currentTaskAttemptID + ", fatalErrorOccurred=" + currentTask.hadFatalError()));
                            if (!currentTask.hadFatalError()) {
                                TezEvent statusUpdateEvent = new TezEvent((Event)new TaskStatusUpdateEvent(currentTask.getCounters(), currentTask.getProgress()), new EventMetaData(EventMetaData.EventProducerConsumerType.SYSTEM, currentTask.getVertexName(), "", currentTask.getTaskAttemptID()));
                                TezEvent taskCompletedEvent = new TezEvent((Event)new TaskAttemptCompletedEvent(), sourceInfo);
                                YarnTezDagChild.heartbeat(Arrays.asList(statusUpdateEvent, taskCompletedEvent));
                            }
                        }
                        finally {
                            currentTask.cleanup();
                        }
                        try {
                            taskLock.writeLock().lock();
                            currentTask = null;
                            currentTaskAttemptID = null;
                        }
                        finally {
                            taskLock.writeLock().unlock();
                        }
                        return null;
                    }
                });
                FileSystem.closeAllForUGI((UserGroupInformation)childUGI);
                containerTask = null;
            } while (!heartbeatError.get());
            LOG.fatal((Object)"Breaking out of task loop, heartbeat error occurred", heartbeatErrorException);
        }
        catch (FSError e) {
            LOG.fatal((Object)"FSError from child", (Throwable)e);
            try {
                taskLock.readLock().lock();
                if (currentTask != null && !currentTask.hadFatalError()) {
                    currentTask.setFatalError((Throwable)e, "FS Error in Child JVM");
                    TezEvent taskAttemptFailedEvent = new TezEvent((Event)new TaskAttemptFailedEvent(StringUtils.stringifyException((Throwable)e)), currentSourceInfo);
                    YarnTezDagChild.heartbeat(Collections.singletonList(taskAttemptFailedEvent));
                }
            }
            finally {
                taskLock.readLock().unlock();
            }
        }
        catch (Throwable throwable) {
            String cause = StringUtils.stringifyException((Throwable)throwable);
            LOG.fatal((Object)("Error running child : " + cause));
            taskLock.readLock().lock();
            try {
                if (currentTask != null && !currentTask.hadFatalError()) {
                    currentTask.setFatalError(throwable, "Error in Child JVM");
                    TezEvent taskAttemptFailedEvent = new TezEvent((Event)new TaskAttemptFailedEvent(cause), currentSourceInfo);
                    YarnTezDagChild.heartbeat(Collections.singletonList(taskAttemptFailedEvent));
                }
            }
            finally {
                taskLock.readLock().unlock();
            }
        }
        finally {
            stopped.set(true);
            heartbeatThread.interrupt();
            RPC.stopProxy((Object)umbilical);
            DefaultMetricsSystem.shutdown();
            LogManager.shutdown();
        }
    }

    private static LogicalIOProcessorRuntimeTask createLogicalTask(int attemptNum, TaskSpec taskSpec, Configuration conf, TezUmbilical tezUmbilical, Map<String, ByteBuffer> serviceConsumerMetadata) throws IOException {
        conf.setBoolean("ipc.client.tcpnodelay", true);
        FileSystem.get((Configuration)conf).setWorkingDirectory(YarnTezDagChild.getWorkingDirectory(conf));
        Object[] localDirs = StringUtils.getTrimmedStrings((String)System.getenv(ApplicationConstants.Environment.LOCAL_DIRS.name()));
        conf.setStrings("tez.runtime.local.dirs", (String[])localDirs);
        LOG.info((Object)("LocalDirs for child: " + Arrays.toString(localDirs)));
        return new LogicalIOProcessorRuntimeTask(taskSpec, attemptNum, conf, tezUmbilical, serviceConsumerMetadata);
    }

    private static Path getWorkingDirectory(Configuration conf) {
        String name = conf.get("mapreduce.job.working.dir");
        if (name != null) {
            return new Path(name);
        }
        try {
            Path dir = FileSystem.get((Configuration)conf).getWorkingDirectory();
            conf.set("mapreduce.job.working.dir", dir.toString());
            return dir;
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    private static void updateLoggers(TezTaskAttemptID tezTaskAttemptID) throws FileNotFoundException {
        String containerLogDir = null;
        LOG.info((Object)("Redirecting log files based on TaskAttemptId: " + tezTaskAttemptID));
        Appender appender = Logger.getRootLogger().getAppender("CLA");
        if (appender != null) {
            if (appender instanceof TezContainerLogAppender) {
                TezContainerLogAppender claAppender = (TezContainerLogAppender)appender;
                containerLogDir = claAppender.getContainerLogDir();
                claAppender.setLogFileName(YarnTezDagChild.constructLogFileName("syslog", tezTaskAttemptID));
                claAppender.activateOptions();
            } else {
                LOG.warn((Object)("Appender is a " + appender.getClass() + "; require an instance of " + TezContainerLogAppender.class.getName() + " to reconfigure the logger output"));
            }
        } else {
            LOG.warn((Object)"Not configured with appender named: CLA. Cannot reconfigure logger output");
        }
        if (containerLogDir != null) {
            System.setOut(new PrintStream(new File(containerLogDir, YarnTezDagChild.constructLogFileName("stdout", tezTaskAttemptID))));
            System.setErr(new PrintStream(new File(containerLogDir, YarnTezDagChild.constructLogFileName("stderr", tezTaskAttemptID))));
        }
    }

    private static String constructLogFileName(String base, TezTaskAttemptID tezTaskAttemptID) {
        if (tezTaskAttemptID == null) {
            return base;
        }
        return base + "_" + tezTaskAttemptID.toString();
    }

    static {
        maxEventsToGet = 0;
        eventsToSend = new LinkedBlockingQueue();
        requestCounter = new AtomicLong(0L);
        taskLock = new ReentrantReadWriteLock();
        currentTask = null;
        heartbeatError = new AtomicBoolean(false);
        heartbeatErrorException = null;
    }
}

