This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.action.hadoop;
019
020 import java.util.Map;
021
022 import org.apache.hadoop.mapred.Counters;
023 import org.apache.hadoop.mapred.Counters.Counter;
024 import org.apache.pig.tools.pigstats.JobStats;
025 import org.apache.pig.tools.pigstats.PigStats;
026 import org.apache.pig.tools.pigstats.PigStatsUtil;
027 import org.json.simple.JSONObject;
028
029 /**
030 * Class to collect the Pig statistics for a Pig action
031 *
032 */
033 public class OoziePigStats extends ActionStats {
034 PigStats pigStats = null;
035
036 public OoziePigStats(PigStats pigStats) {
037 this.currentActionType = ActionType.PIG;
038 this.pigStats = pigStats;
039 }
040
041 /**
042 * The PigStats API is used to collect the statistics and the result is returned as a JSON String.
043 *
044 * @return a JSON string
045 */
046 @SuppressWarnings("unchecked")
047 @Override
048 public String toJSON() {
049 JSONObject pigStatsGroup = new JSONObject();
050 pigStatsGroup.put("ACTION_TYPE", getCurrentActionType().toString());
051
052 // pig summary related counters
053 pigStatsGroup.put("BYTES_WRITTEN", Long.toString(pigStats.getBytesWritten()));
054 pigStatsGroup.put("DURATION", Long.toString(pigStats.getDuration()));
055 pigStatsGroup.put("ERROR_CODE", Long.toString(pigStats.getErrorCode()));
056 pigStatsGroup.put("ERROR_MESSAGE", pigStats.getErrorMessage());
057 pigStatsGroup.put("FEATURES", pigStats.getFeatures());
058 pigStatsGroup.put("HADOOP_VERSION", pigStats.getHadoopVersion());
059 pigStatsGroup.put("NUMBER_JOBS", Long.toString(pigStats.getNumberJobs()));
060 pigStatsGroup.put("PIG_VERSION", pigStats.getPigVersion());
061 pigStatsGroup.put("PROACTIVE_SPILL_COUNT_OBJECTS", Long.toString(pigStats.getProactiveSpillCountObjects()));
062 pigStatsGroup.put("PROACTIVE_SPILL_COUNT_RECORDS", Long.toString(pigStats.getProactiveSpillCountRecords()));
063 pigStatsGroup.put("RECORD_WRITTEN", Long.toString(pigStats.getRecordWritten()));
064 pigStatsGroup.put("RETURN_CODE", Long.toString(pigStats.getReturnCode()));
065 pigStatsGroup.put("SCRIPT_ID", pigStats.getScriptId());
066 pigStatsGroup.put("SMM_SPILL_COUNT", Long.toString(pigStats.getSMMSpillCount()));
067
068 PigStats.JobGraph jobGraph = pigStats.getJobGraph();
069 StringBuffer sb = new StringBuffer();
070 String separator = ",";
071
072 for (JobStats jobStats : jobGraph) {
073 // Get all the HadoopIds and put them as comma separated string for JOB_GRAPH
074 String hadoopId = jobStats.getJobId();
075 if (sb.length() > 0) {
076 sb.append(separator);
077 }
078 sb.append(hadoopId);
079 // Hadoop Counters for pig created MR job
080 pigStatsGroup.put(hadoopId, toJSONFromJobStats(jobStats));
081 }
082 pigStatsGroup.put("JOB_GRAPH", sb.toString());
083 return pigStatsGroup.toJSONString();
084 }
085
086 // MR job related counters
087 @SuppressWarnings("unchecked")
088 private static JSONObject toJSONFromJobStats(JobStats jobStats) {
089 JSONObject jobStatsGroup = new JSONObject();
090
091 // hadoop counters
092 jobStatsGroup.put(PigStatsUtil.HDFS_BYTES_WRITTEN, Long.toString(jobStats.getHdfsBytesWritten()));
093 jobStatsGroup.put(PigStatsUtil.MAP_INPUT_RECORDS, Long.toString(jobStats.getMapInputRecords()));
094 jobStatsGroup.put(PigStatsUtil.MAP_OUTPUT_RECORDS, Long.toString(jobStats.getMapOutputRecords()));
095 jobStatsGroup.put(PigStatsUtil.REDUCE_INPUT_RECORDS, Long.toString(jobStats.getReduceInputRecords()));
096 jobStatsGroup.put(PigStatsUtil.REDUCE_OUTPUT_RECORDS, Long.toString(jobStats.getReduceOutputRecords()));
097 // currently returns null; pig bug
098 jobStatsGroup.put("HADOOP_COUNTERS", toJSONFromCounters(jobStats.getHadoopCounters()));
099
100 // pig generated hadoop counters and other stats
101 jobStatsGroup.put("Alias", jobStats.getAlias());
102 jobStatsGroup.put("AVG_MAP_TIME", Long.toString(jobStats.getAvgMapTime()));
103 jobStatsGroup.put("AVG_REDUCE_TIME", Long.toString(jobStats.getAvgREduceTime()));
104 jobStatsGroup.put("BYTES_WRITTEN", Long.toString(jobStats.getBytesWritten()));
105 jobStatsGroup.put("ERROR_MESSAGE", jobStats.getErrorMessage());
106 jobStatsGroup.put("FEATURE", jobStats.getFeature());
107 jobStatsGroup.put("JOB_ID", jobStats.getJobId());
108 jobStatsGroup.put("MAX_MAP_TIME", Long.toString(jobStats.getMaxMapTime()));
109 jobStatsGroup.put("MIN_MAP_TIME", Long.toString(jobStats.getMinMapTime()));
110 jobStatsGroup.put("MAX_REDUCE_TIME", Long.toString(jobStats.getMaxReduceTime()));
111 jobStatsGroup.put("MIN_REDUCE_TIME", Long.toString(jobStats.getMinReduceTime()));
112 jobStatsGroup.put("NUMBER_MAPS", Long.toString(jobStats.getNumberMaps()));
113 jobStatsGroup.put("NUMBER_REDUCES", Long.toString(jobStats.getNumberReduces()));
114 jobStatsGroup.put("PROACTIVE_SPILL_COUNT_OBJECTS", Long.toString(jobStats.getProactiveSpillCountObjects()));
115 jobStatsGroup.put("PROACTIVE_SPILL_COUNT_RECS", Long.toString(jobStats.getProactiveSpillCountRecs()));
116 jobStatsGroup.put("RECORD_WRITTEN", Long.toString(jobStats.getRecordWrittern()));
117 jobStatsGroup.put("SMMS_SPILL_COUNT", Long.toString(jobStats.getSMMSpillCount()));
118 jobStatsGroup.put("MULTI_STORE_COUNTERS", toJSONFromMultiStoreCounters(jobStats.getMultiStoreCounters()));
119
120 return jobStatsGroup;
121
122 }
123
124 // multistorecounters to JSON
125 @SuppressWarnings("unchecked")
126 private static JSONObject toJSONFromMultiStoreCounters(Map<String, Long> map) {
127 JSONObject group = new JSONObject();
128 for (String cName : map.keySet()) {
129 group.put(cName, map.get(cName));
130 }
131 return group;
132
133 }
134
135 // hadoop counters to JSON
136 @SuppressWarnings("unchecked")
137 private static JSONObject toJSONFromCounters(Counters counters) {
138 if (counters == null) {
139 return null;
140 }
141
142 JSONObject groups = new JSONObject();
143 for (String gName : counters.getGroupNames()) {
144 JSONObject group = new JSONObject();
145 for (Counter counter : counters.getGroup(gName)) {
146 String cName = counter.getName();
147 Long cValue = counter.getValue();
148 group.put(cName, Long.toString(cValue));
149 }
150 groups.put(gName, group);
151 }
152 return groups;
153
154 }
155
156 }