/*
 * Decompiled with CFR 0.152.
 */
package org.apache.tez.dag.api.client;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.sun.jersey.api.client.Client;
import com.sun.jersey.api.client.ClientHandlerException;
import com.sun.jersey.api.client.ClientResponse;
import com.sun.jersey.api.client.UniformInterfaceException;
import com.sun.jersey.api.client.WebResource;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import javax.annotation.Nullable;
import javax.ws.rs.core.MediaType;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.tez.client.FrameworkClient;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.client.DAGClient;
import org.apache.tez.dag.api.client.DAGStatus;
import org.apache.tez.dag.api.client.StatusGetOpts;
import org.apache.tez.dag.api.client.TimelineReaderFactory;
import org.apache.tez.dag.api.client.VertexStatus;
import org.apache.tez.dag.api.records.DAGProtos;
import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;

@InterfaceAudience.Private
public class DAGClientTimelineImpl
extends DAGClient {
    private static final Log LOG = LogFactory.getLog(DAGClientTimelineImpl.class);
    private static final String FILTER_BY_FIELDS = "primaryfilters,otherinfo";
    private static final String HTTPS_SCHEME = "https://";
    private static final String HTTP_SCHEME = "http://";
    private Client httpClient = null;
    private final TimelineReaderFactory.TimelineReaderStrategy timelineReaderStrategy;
    private final ApplicationId appId;
    private final String dagId;
    private final FrameworkClient frameworkClient;
    private Map<String, VertexTaskStats> vertexTaskStatsCache = null;
    @VisibleForTesting
    protected String baseUri;
    private static final Map<String, DAGProtos.DAGStatusStateProto> dagStateProtoMap = Collections.unmodifiableMap(new HashMap<String, DAGProtos.DAGStatusStateProto>(){
        {
            this.put("NEW", DAGProtos.DAGStatusStateProto.DAG_SUBMITTED);
            this.put("INITED", DAGProtos.DAGStatusStateProto.DAG_SUBMITTED);
            this.put("RUNNING", DAGProtos.DAGStatusStateProto.DAG_RUNNING);
            this.put("SUCCEEDED", DAGProtos.DAGStatusStateProto.DAG_SUCCEEDED);
            this.put("FAILED", DAGProtos.DAGStatusStateProto.DAG_FAILED);
            this.put("KILLED", DAGProtos.DAGStatusStateProto.DAG_KILLED);
            this.put("ERROR", DAGProtos.DAGStatusStateProto.DAG_ERROR);
            this.put("TERMINATING", DAGProtos.DAGStatusStateProto.DAG_TERMINATING);
        }
    });
    private static final Map<String, DAGProtos.VertexStatusStateProto> vertexStateProtoMap = Collections.unmodifiableMap(new HashMap<String, DAGProtos.VertexStatusStateProto>(){
        {
            this.put("NEW", DAGProtos.VertexStatusStateProto.VERTEX_NEW);
            this.put("INITIALIZING", DAGProtos.VertexStatusStateProto.VERTEX_INITIALIZING);
            this.put("RECOVERING", DAGProtos.VertexStatusStateProto.VERTEX_RECOVERING);
            this.put("INITED", DAGProtos.VertexStatusStateProto.VERTEX_INITED);
            this.put("RUNNING", DAGProtos.VertexStatusStateProto.VERTEX_RUNNING);
            this.put("SUCCEEDED", DAGProtos.VertexStatusStateProto.VERTEX_SUCCEEDED);
            this.put("FAILED", DAGProtos.VertexStatusStateProto.VERTEX_FAILED);
            this.put("KILLED", DAGProtos.VertexStatusStateProto.VERTEX_KILLED);
            this.put("ERROR", DAGProtos.VertexStatusStateProto.VERTEX_ERROR);
            this.put("TERMINATING", DAGProtos.VertexStatusStateProto.VERTEX_TERMINATING);
        }
    });

    public DAGClientTimelineImpl(ApplicationId appId, String dagId, TezConfiguration conf, FrameworkClient frameworkClient, int connTimeout) throws TezException {
        String webAppAddress;
        String scheme;
        if (!TimelineReaderFactory.isTimelineClientSupported()) {
            throw new TezException("Reading from secure timeline is supported only for hadoop 2.6 and above.");
        }
        this.appId = appId;
        this.dagId = dagId;
        this.frameworkClient = frameworkClient;
        boolean useHttps = this.webappHttpsOnly(conf);
        if (useHttps) {
            scheme = HTTPS_SCHEME;
            webAppAddress = conf.get("yarn.timeline-service.webapp.https.address");
        } else {
            scheme = HTTP_SCHEME;
            webAppAddress = conf.get("yarn.timeline-service.webapp.address");
        }
        if (webAppAddress == null) {
            throw new TezException("Failed to get ATS webapp address");
        }
        this.baseUri = Joiner.on((String)"").join((Object)scheme, (Object)webAppAddress, new Object[]{"/ws/v1/timeline"});
        this.timelineReaderStrategy = TimelineReaderFactory.getTimelineReaderStrategy(conf, useHttps, connTimeout);
    }

    public static boolean isSupported() {
        return TimelineReaderFactory.isTimelineClientSupported();
    }

    @Override
    public String getExecutionContext() {
        return "Executing on YARN cluster with App id " + this.appId;
    }

    @Override
    protected ApplicationReport getApplicationReportInternal() {
        ApplicationReport appReport = null;
        try {
            appReport = this.frameworkClient.getApplicationReport(this.appId);
        }
        catch (YarnException e) {
        }
        catch (IOException iOException) {
            // empty catch block
        }
        return appReport;
    }

    @Override
    public DAGStatus getDAGStatus(@Nullable Set<StatusGetOpts> statusOptions) throws IOException, TezException {
        String url = String.format("%s/%s/%s?fields=%s", this.baseUri, "TEZ_DAG_ID", this.dagId, FILTER_BY_FIELDS);
        try {
            JSONObject jsonRoot = this.getJsonRootEntity(url);
            DAGProtos.DAGStatusProto.Builder statusBuilder = this.parseDagStatus(jsonRoot, statusOptions);
            if (statusBuilder == null) {
                throw new TezException("Failed to get DagStatus from ATS");
            }
            return new DAGStatus(statusBuilder);
        }
        catch (JSONException je) {
            throw new TezException("Failed to parse DagStatus json from YARN Timeline", je);
        }
    }

    @Override
    public VertexStatus getVertexStatus(String vertexName, Set<StatusGetOpts> statusOptions) throws IOException, TezException {
        String url = String.format("%s/%s?primaryFilter=%s:%s&secondaryFilter=vertexName:%s&fields=%s", this.baseUri, "TEZ_VERTEX_ID", "TEZ_DAG_ID", this.dagId, vertexName, FILTER_BY_FIELDS);
        try {
            JSONObject jsonRoot = this.getJsonRootEntity(url);
            JSONArray entitiesNode = jsonRoot.optJSONArray("entities");
            if (entitiesNode == null || entitiesNode.length() != 1) {
                throw new TezException("Failed to get vertex status YARN Timeline");
            }
            JSONObject vertexNode = entitiesNode.getJSONObject(0);
            DAGProtos.VertexStatusProto.Builder statusBuilder = this.parseVertexStatus(vertexNode, statusOptions);
            if (statusBuilder == null) {
                throw new TezException("Failed to parse vertex status from YARN Timeline");
            }
            return new VertexStatus(statusBuilder);
        }
        catch (JSONException je) {
            throw new TezException("Failed to parse VertexStatus json from YARN Timeline", je);
        }
    }

    @Override
    public void tryKillDAG() throws IOException, TezException {
        throw new TezException("tryKillDAG is unsupported for DAGClientTimelineImpl");
    }

    @Override
    public DAGStatus waitForCompletion() throws IOException, TezException, InterruptedException {
        return this.getDAGStatus(null);
    }

    @Override
    public DAGStatus waitForCompletionWithStatusUpdates(@Nullable Set<StatusGetOpts> statusGetOpts) throws IOException, TezException, InterruptedException {
        return this.getDAGStatus(statusGetOpts);
    }

    @Override
    public void close() throws IOException {
        if (this.httpClient != null) {
            this.httpClient.destroy();
            this.httpClient = null;
        }
    }

    private DAGProtos.DAGStatusProto.Builder parseDagStatus(JSONObject jsonRoot, Set<StatusGetOpts> statusOptions) throws JSONException, TezException {
        Map<String, VertexTaskStats> vertexTaskStatsMap;
        JSONObject countersNode;
        DAGProtos.TezCountersProto.Builder tezCounterBuilder;
        JSONObject otherInfoNode = jsonRoot.getJSONObject("otherinfo");
        DAGProtos.DAGStatusProto.Builder dagStatusBuilder = DAGProtos.DAGStatusProto.newBuilder();
        String status = otherInfoNode.optString("status");
        String diagnostics = otherInfoNode.optString("diagnostics");
        if (status.equals("")) {
            return null;
        }
        dagStatusBuilder.setState(dagStateProtoMap.get(status)).addAllDiagnostics(Collections.singleton(diagnostics));
        if (statusOptions != null && statusOptions.contains((Object)StatusGetOpts.GET_COUNTERS) && (tezCounterBuilder = this.parseDagCounters(countersNode = otherInfoNode.optJSONObject("counters"))) != null) {
            dagStatusBuilder.setDagCounters(tezCounterBuilder);
        }
        if ((vertexTaskStatsMap = this.parseTaskStatsForVertexes()).size() > 0) {
            DAGProtos.ProgressProto.Builder dagProgressBuilder = this.getProgressBuilder(vertexTaskStatsMap, null);
            dagStatusBuilder.setDAGProgress(dagProgressBuilder);
            ArrayList<DAGProtos.StringProgressPairProto> vertexProgressBuilder = new ArrayList<DAGProtos.StringProgressPairProto>(vertexTaskStatsMap.size());
            for (Map.Entry<String, VertexTaskStats> v : vertexTaskStatsMap.entrySet()) {
                DAGProtos.StringProgressPairProto vertexProgressProto = DAGProtos.StringProgressPairProto.newBuilder().setKey(v.getKey()).setProgress(this.getProgressBuilder(vertexTaskStatsMap, v.getKey())).build();
                vertexProgressBuilder.add(vertexProgressProto);
            }
            dagStatusBuilder.addAllVertexProgress(vertexProgressBuilder);
        }
        return dagStatusBuilder;
    }

    private DAGProtos.ProgressProto.Builder getProgressBuilder(Map<String, VertexTaskStats> vertexTaskStatsMap, String vertexName) {
        int failedTaskCount = 0;
        int killedTaskCount = 0;
        int runningTaskCount = 0;
        int succeededTaskCount = 0;
        int totalCount = 0;
        for (Map.Entry<String, VertexTaskStats> v : vertexTaskStatsMap.entrySet()) {
            if (vertexName != null && !vertexName.equals(v.getKey())) continue;
            VertexTaskStats taskStats = v.getValue();
            totalCount += taskStats.numTaskCount;
            succeededTaskCount += taskStats.succeededTaskCount;
            killedTaskCount += taskStats.killedTaskCount;
            failedTaskCount += taskStats.failedTaskCount;
            runningTaskCount += taskStats.numTaskCount - taskStats.completedTaskCount;
        }
        DAGProtos.ProgressProto.Builder progressBuilder = DAGProtos.ProgressProto.newBuilder();
        progressBuilder.setTotalTaskCount(totalCount);
        progressBuilder.setRunningTaskCount(runningTaskCount);
        progressBuilder.setSucceededTaskCount(succeededTaskCount);
        progressBuilder.setKilledTaskCount(killedTaskCount);
        progressBuilder.setFailedTaskCount(failedTaskCount);
        return progressBuilder;
    }

    private DAGProtos.VertexStatusProto.Builder parseVertexStatus(JSONObject jsonRoot, Set<StatusGetOpts> statusOptions) throws JSONException {
        JSONObject countersNode;
        DAGProtos.TezCountersProto.Builder tezCounterBuilder;
        JSONObject otherInfoNode = jsonRoot.getJSONObject("otherinfo");
        DAGProtos.VertexStatusProto.Builder vertexStatusBuilder = DAGProtos.VertexStatusProto.newBuilder();
        String status = otherInfoNode.optString("status");
        String diagnostics = otherInfoNode.optString("diagnostics");
        if (status.equals("")) {
            return null;
        }
        vertexStatusBuilder.setState(vertexStateProtoMap.get(status)).addAllDiagnostics(Collections.singleton(diagnostics));
        int numRunningTasks = otherInfoNode.optInt("numTasks") - otherInfoNode.optInt("numCompletedTasks");
        DAGProtos.ProgressProto.Builder progressBuilder = DAGProtos.ProgressProto.newBuilder();
        progressBuilder.setTotalTaskCount(otherInfoNode.optInt("numTasks"));
        progressBuilder.setRunningTaskCount(numRunningTasks);
        progressBuilder.setSucceededTaskCount(otherInfoNode.optInt("numSucceededTasks"));
        progressBuilder.setKilledTaskCount(otherInfoNode.optInt("numKilledTasks"));
        progressBuilder.setFailedTaskCount(otherInfoNode.optInt("numFailedTasks"));
        vertexStatusBuilder.setProgress(progressBuilder);
        if (statusOptions != null && statusOptions.contains((Object)StatusGetOpts.GET_COUNTERS) && (tezCounterBuilder = this.parseDagCounters(countersNode = otherInfoNode.optJSONObject("counters"))) != null) {
            vertexStatusBuilder.setVertexCounters(tezCounterBuilder);
        }
        return vertexStatusBuilder;
    }

    private DAGProtos.TezCountersProto.Builder parseDagCounters(JSONObject countersNode) throws JSONException {
        if (countersNode == null) {
            return null;
        }
        DAGProtos.TezCountersProto.Builder countersProto = DAGProtos.TezCountersProto.newBuilder();
        JSONArray counterGroupNodes = countersNode.optJSONArray("counterGroups");
        if (counterGroupNodes != null) {
            int numCounterGroups = counterGroupNodes.length();
            for (int i = 0; i < numCounterGroups; ++i) {
                DAGProtos.TezCounterGroupProto.Builder counterGroupBuilder = this.parseCounterGroup(counterGroupNodes.optJSONObject(i));
                if (counterGroupBuilder == null) continue;
                countersProto.addCounterGroups(counterGroupBuilder);
            }
        }
        return countersProto;
    }

    private DAGProtos.TezCounterGroupProto.Builder parseCounterGroup(JSONObject counterGroupNode) throws JSONException {
        if (counterGroupNode == null) {
            return null;
        }
        DAGProtos.TezCounterGroupProto.Builder counterGroup = DAGProtos.TezCounterGroupProto.newBuilder();
        String groupName = counterGroupNode.optString("counterGroupName");
        String groupDisplayName = counterGroupNode.optString("counterGroupDisplayName", groupName);
        JSONArray counterNodes = counterGroupNode.optJSONArray("counters");
        int numCounters = counterNodes.length();
        ArrayList<DAGProtos.TezCounterProto> counters = new ArrayList<DAGProtos.TezCounterProto>(numCounters);
        for (int i = 0; i < numCounters; ++i) {
            JSONObject counterNode = counterNodes.getJSONObject(i);
            String counterName = counterNode.getString("counterName");
            String counterDisplayName = counterNode.optString("counterDisplayName", counterName);
            long counterValue = counterNode.getLong("counterValue");
            counters.add(DAGProtos.TezCounterProto.newBuilder().setName(counterName).setDisplayName(counterDisplayName).setValue(counterValue).build());
        }
        return counterGroup.setName(groupName).setDisplayName(groupDisplayName).addAllCounters(counters);
    }

    @VisibleForTesting
    protected Map<String, VertexTaskStats> parseTaskStatsForVertexes() throws TezException, JSONException {
        String url;
        JSONObject jsonRoot;
        JSONArray vertexNodes;
        if (this.vertexTaskStatsCache == null && (vertexNodes = (jsonRoot = this.getJsonRootEntity(url = String.format("%s/%s?primaryFilter=%s:%s&fields=%s", this.baseUri, "TEZ_VERTEX_ID", "TEZ_DAG_ID", this.dagId, FILTER_BY_FIELDS))).optJSONArray("entities")) != null) {
            int numVertexNodes = vertexNodes.length();
            HashMap<String, VertexTaskStats> vertexTaskStatsMap = new HashMap<String, VertexTaskStats>(numVertexNodes);
            for (int i = 0; i < numVertexNodes; ++i) {
                JSONObject vertexNode = vertexNodes.getJSONObject(i);
                JSONObject otherInfoNode = vertexNode.getJSONObject("otherinfo");
                String vertexName = otherInfoNode.getString("vertexName");
                VertexTaskStats vertexTaskStats = new VertexTaskStats(otherInfoNode.optInt("numTasks"), otherInfoNode.optInt("numCompletedTasks"), otherInfoNode.optInt("numSucceededTasks"), otherInfoNode.optInt("numKilledTasks"), otherInfoNode.optInt("numFailedTasks"));
                vertexTaskStatsMap.put(vertexName, vertexTaskStats);
            }
            this.vertexTaskStatsCache = vertexTaskStatsMap;
        }
        return this.vertexTaskStatsCache;
    }

    @VisibleForTesting
    protected JSONObject getJsonRootEntity(String url) throws TezException {
        try {
            WebResource wr = this.getCachedHttpClient().resource(url);
            ClientResponse response = (ClientResponse)((WebResource.Builder)wr.accept(new MediaType[]{MediaType.APPLICATION_JSON_TYPE}).type(MediaType.APPLICATION_JSON_TYPE)).get(ClientResponse.class);
            ClientResponse.Status clientResponseStatus = response.getClientResponseStatus();
            if (clientResponseStatus != ClientResponse.Status.OK) {
                throw new TezException("Failed to get response from YARN Timeline: errorCode:" + clientResponseStatus + ", url:" + url);
            }
            return (JSONObject)response.getEntity(JSONObject.class);
        }
        catch (ClientHandlerException e) {
            throw new TezException("Error processing response from YARN Timeline", e);
        }
        catch (UniformInterfaceException e) {
            throw new TezException("Error accessing content from YARN Timeline - unexpected response", e);
        }
        catch (IllegalArgumentException e) {
            throw new TezException("Error accessing content from YARN Timeline - invalid url", e);
        }
        catch (IOException e) {
            throw new TezException("Error failed to get http client", e);
        }
    }

    private boolean webappHttpsOnly(Configuration conf) throws TezException {
        try {
            Class<?> yarnConfiguration = Class.forName("org.apache.hadoop.yarn.conf.YarnConfiguration");
            Method useHttps = yarnConfiguration.getMethod("useHttps", Configuration.class);
            return (Boolean)useHttps.invoke(null, conf);
        }
        catch (ClassNotFoundException e) {
            throw new TezException(e);
        }
        catch (InvocationTargetException e) {
            throw new TezException(e);
        }
        catch (NoSuchMethodException e) {
            throw new TezException(e);
        }
        catch (IllegalAccessException e) {
            throw new TezException(e);
        }
    }

    protected Client getCachedHttpClient() throws IOException {
        if (this.httpClient == null) {
            this.httpClient = this.timelineReaderStrategy.getHttpClient();
        }
        return this.httpClient;
    }

    @VisibleForTesting
    protected class VertexTaskStats {
        final int numTaskCount;
        final int completedTaskCount;
        final int succeededTaskCount;
        final int killedTaskCount;
        final int failedTaskCount;

        public VertexTaskStats(int numTaskCount, int completedTaskCount, int succeededTaskCount, int killedTaskCount, int failedTaskCount) {
            this.numTaskCount = numTaskCount;
            this.completedTaskCount = completedTaskCount;
            this.succeededTaskCount = succeededTaskCount;
            this.killedTaskCount = killedTaskCount;
            this.failedTaskCount = failedTaskCount;
        }
    }
}

