This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.action.hadoop;
019
020 import java.io.IOException;
021 import java.util.ArrayList;
022 import java.util.List;
023 import java.util.Map;
024
025 import org.apache.hadoop.mapred.Counters;
026 import org.apache.hadoop.mapred.Counters.Counter;
027 import org.apache.oozie.util.ParamChecker;
028 import org.apache.pig.tools.pigstats.JobStats;
029 import org.apache.pig.tools.pigstats.PigStats;
030 import org.apache.pig.tools.pigstats.PigStatsUtil;
031 import org.json.simple.JSONObject;
032
033 /**
034 * Class to collect the Pig statistics for a Pig action
035 *
036 */
037 public class OoziePigStats extends ActionStats {
038 PigStats pigStats = null;
039
040 public OoziePigStats(PigStats pigStats) {
041 this.currentActionType = ActionType.PIG;
042 this.pigStats = pigStats;
043 }
044
045 /**
046 * The PigStats API is used to collect the statistics and the result is returned as a JSON String.
047 *
048 * @return a JSON string
049 */
050 @SuppressWarnings("unchecked")
051 @Override
052 public String toJSON() {
053 JSONObject pigStatsGroup = new JSONObject();
054 pigStatsGroup.put("ACTION_TYPE", getCurrentActionType().toString());
055
056 // pig summary related counters
057 pigStatsGroup.put("BYTES_WRITTEN", Long.toString(pigStats.getBytesWritten()));
058 pigStatsGroup.put("DURATION", Long.toString(pigStats.getDuration()));
059 pigStatsGroup.put("ERROR_CODE", Long.toString(pigStats.getErrorCode()));
060 pigStatsGroup.put("ERROR_MESSAGE", pigStats.getErrorMessage());
061 pigStatsGroup.put("FEATURES", pigStats.getFeatures());
062 pigStatsGroup.put("HADOOP_VERSION", pigStats.getHadoopVersion());
063 pigStatsGroup.put("NUMBER_JOBS", Long.toString(pigStats.getNumberJobs()));
064 pigStatsGroup.put("PIG_VERSION", pigStats.getPigVersion());
065 pigStatsGroup.put("PROACTIVE_SPILL_COUNT_OBJECTS", Long.toString(pigStats.getProactiveSpillCountObjects()));
066 pigStatsGroup.put("PROACTIVE_SPILL_COUNT_RECORDS", Long.toString(pigStats.getProactiveSpillCountRecords()));
067 pigStatsGroup.put("RECORD_WRITTEN", Long.toString(pigStats.getRecordWritten()));
068 pigStatsGroup.put("RETURN_CODE", Long.toString(pigStats.getReturnCode()));
069 pigStatsGroup.put("SCRIPT_ID", pigStats.getScriptId());
070 pigStatsGroup.put("SMM_SPILL_COUNT", Long.toString(pigStats.getSMMSpillCount()));
071
072 PigStats.JobGraph jobGraph = pigStats.getJobGraph();
073 StringBuffer sb = new StringBuffer();
074 String separator = ",";
075
076 for (JobStats jobStats : jobGraph) {
077 // Get all the HadoopIds and put them as comma separated string for JOB_GRAPH
078 String hadoopId = jobStats.getJobId();
079 if (sb.length() > 0) {
080 sb.append(separator);
081 }
082 sb.append(hadoopId);
083 // Hadoop Counters for pig created MR job
084 pigStatsGroup.put(hadoopId, toJSONFromJobStats(jobStats));
085 }
086 pigStatsGroup.put("JOB_GRAPH", sb.toString());
087 return pigStatsGroup.toJSONString();
088 }
089
090 // MR job related counters
091 @SuppressWarnings("unchecked")
092 private static JSONObject toJSONFromJobStats(JobStats jobStats) {
093 JSONObject jobStatsGroup = new JSONObject();
094
095 // hadoop counters
096 jobStatsGroup.put(PigStatsUtil.HDFS_BYTES_WRITTEN, Long.toString(jobStats.getHdfsBytesWritten()));
097 jobStatsGroup.put(PigStatsUtil.MAP_INPUT_RECORDS, Long.toString(jobStats.getMapInputRecords()));
098 jobStatsGroup.put(PigStatsUtil.MAP_OUTPUT_RECORDS, Long.toString(jobStats.getMapOutputRecords()));
099 jobStatsGroup.put(PigStatsUtil.REDUCE_INPUT_RECORDS, Long.toString(jobStats.getReduceInputRecords()));
100 jobStatsGroup.put(PigStatsUtil.REDUCE_OUTPUT_RECORDS, Long.toString(jobStats.getReduceOutputRecords()));
101 // currently returns null; pig bug
102 jobStatsGroup.put("HADOOP_COUNTERS", toJSONFromCounters(jobStats.getHadoopCounters()));
103
104 // pig generated hadoop counters and other stats
105 jobStatsGroup.put("Alias", jobStats.getAlias());
106 jobStatsGroup.put("AVG_MAP_TIME", Long.toString(jobStats.getAvgMapTime()));
107 jobStatsGroup.put("AVG_REDUCE_TIME", Long.toString(jobStats.getAvgREduceTime()));
108 jobStatsGroup.put("BYTES_WRITTEN", Long.toString(jobStats.getBytesWritten()));
109 jobStatsGroup.put("ERROR_MESSAGE", jobStats.getErrorMessage());
110 jobStatsGroup.put("FEATURE", jobStats.getFeature());
111 jobStatsGroup.put("JOB_ID", jobStats.getJobId());
112 jobStatsGroup.put("MAX_MAP_TIME", Long.toString(jobStats.getMaxMapTime()));
113 jobStatsGroup.put("MIN_MAP_TIME", Long.toString(jobStats.getMinMapTime()));
114 jobStatsGroup.put("MAX_REDUCE_TIME", Long.toString(jobStats.getMaxReduceTime()));
115 jobStatsGroup.put("MIN_REDUCE_TIME", Long.toString(jobStats.getMinReduceTime()));
116 jobStatsGroup.put("NUMBER_MAPS", Long.toString(jobStats.getNumberMaps()));
117 jobStatsGroup.put("NUMBER_REDUCES", Long.toString(jobStats.getNumberReduces()));
118 jobStatsGroup.put("PROACTIVE_SPILL_COUNT_OBJECTS", Long.toString(jobStats.getProactiveSpillCountObjects()));
119 jobStatsGroup.put("PROACTIVE_SPILL_COUNT_RECS", Long.toString(jobStats.getProactiveSpillCountRecs()));
120 jobStatsGroup.put("RECORD_WRITTEN", Long.toString(jobStats.getRecordWrittern()));
121 jobStatsGroup.put("SMMS_SPILL_COUNT", Long.toString(jobStats.getSMMSpillCount()));
122 jobStatsGroup.put("MULTI_STORE_COUNTERS", toJSONFromMultiStoreCounters(jobStats.getMultiStoreCounters()));
123
124 return jobStatsGroup;
125
126 }
127
128 // multistorecounters to JSON
129 @SuppressWarnings("unchecked")
130 private static JSONObject toJSONFromMultiStoreCounters(Map<String, Long> map) {
131 JSONObject group = new JSONObject();
132 for (String cName : map.keySet()) {
133 group.put(cName, map.get(cName));
134 }
135 return group;
136
137 }
138
139 // hadoop counters to JSON
140 @SuppressWarnings("unchecked")
141 private static JSONObject toJSONFromCounters(Counters counters) {
142 if (counters == null) {
143 return null;
144 }
145
146 JSONObject groups = new JSONObject();
147 for (String gName : counters.getGroupNames()) {
148 JSONObject group = new JSONObject();
149 for (Counter counter : counters.getGroup(gName)) {
150 String cName = counter.getName();
151 Long cValue = counter.getValue();
152 group.put(cName, Long.toString(cValue));
153 }
154 groups.put(gName, group);
155 }
156 return groups;
157
158 }
159
160 }