001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.action.hadoop; 019 020 import java.util.Map; 021 022 import org.apache.hadoop.mapred.Counters; 023 import org.apache.hadoop.mapred.Counters.Counter; 024 import org.apache.pig.tools.pigstats.JobStats; 025 import org.apache.pig.tools.pigstats.PigStats; 026 import org.apache.pig.tools.pigstats.PigStatsUtil; 027 import org.json.simple.JSONObject; 028 029 /** 030 * Class to collect the Pig statistics for a Pig action 031 * 032 */ 033 public class OoziePigStats extends ActionStats { 034 PigStats pigStats = null; 035 036 public OoziePigStats(PigStats pigStats) { 037 this.currentActionType = ActionType.PIG; 038 this.pigStats = pigStats; 039 } 040 041 /** 042 * The PigStats API is used to collect the statistics and the result is returned as a JSON String. 043 * 044 * @return a JSON string 045 */ 046 @SuppressWarnings("unchecked") 047 @Override 048 public String toJSON() { 049 JSONObject pigStatsGroup = new JSONObject(); 050 pigStatsGroup.put("ACTION_TYPE", getCurrentActionType().toString()); 051 052 // pig summary related counters 053 pigStatsGroup.put("BYTES_WRITTEN", Long.toString(pigStats.getBytesWritten())); 054 pigStatsGroup.put("DURATION", Long.toString(pigStats.getDuration())); 055 pigStatsGroup.put("ERROR_CODE", Long.toString(pigStats.getErrorCode())); 056 pigStatsGroup.put("ERROR_MESSAGE", pigStats.getErrorMessage()); 057 pigStatsGroup.put("FEATURES", pigStats.getFeatures()); 058 pigStatsGroup.put("HADOOP_VERSION", pigStats.getHadoopVersion()); 059 pigStatsGroup.put("NUMBER_JOBS", Long.toString(pigStats.getNumberJobs())); 060 pigStatsGroup.put("PIG_VERSION", pigStats.getPigVersion()); 061 pigStatsGroup.put("PROACTIVE_SPILL_COUNT_OBJECTS", Long.toString(pigStats.getProactiveSpillCountObjects())); 062 pigStatsGroup.put("PROACTIVE_SPILL_COUNT_RECORDS", Long.toString(pigStats.getProactiveSpillCountRecords())); 063 pigStatsGroup.put("RECORD_WRITTEN", Long.toString(pigStats.getRecordWritten())); 064 pigStatsGroup.put("RETURN_CODE", Long.toString(pigStats.getReturnCode())); 065 pigStatsGroup.put("SCRIPT_ID", pigStats.getScriptId()); 066 pigStatsGroup.put("SMM_SPILL_COUNT", Long.toString(pigStats.getSMMSpillCount())); 067 068 PigStats.JobGraph jobGraph = pigStats.getJobGraph(); 069 StringBuffer sb = new StringBuffer(); 070 String separator = ","; 071 072 for (JobStats jobStats : jobGraph) { 073 // Get all the HadoopIds and put them as comma separated string for JOB_GRAPH 074 String hadoopId = jobStats.getJobId(); 075 if (sb.length() > 0) { 076 sb.append(separator); 077 } 078 sb.append(hadoopId); 079 // Hadoop Counters for pig created MR job 080 pigStatsGroup.put(hadoopId, toJSONFromJobStats(jobStats)); 081 } 082 pigStatsGroup.put("JOB_GRAPH", sb.toString()); 083 return pigStatsGroup.toJSONString(); 084 } 085 086 // MR job related counters 087 @SuppressWarnings("unchecked") 088 private static JSONObject toJSONFromJobStats(JobStats jobStats) { 089 JSONObject jobStatsGroup = new JSONObject(); 090 091 // hadoop counters 092 jobStatsGroup.put(PigStatsUtil.HDFS_BYTES_WRITTEN, Long.toString(jobStats.getHdfsBytesWritten())); 093 jobStatsGroup.put(PigStatsUtil.MAP_INPUT_RECORDS, Long.toString(jobStats.getMapInputRecords())); 094 jobStatsGroup.put(PigStatsUtil.MAP_OUTPUT_RECORDS, Long.toString(jobStats.getMapOutputRecords())); 095 jobStatsGroup.put(PigStatsUtil.REDUCE_INPUT_RECORDS, Long.toString(jobStats.getReduceInputRecords())); 096 jobStatsGroup.put(PigStatsUtil.REDUCE_OUTPUT_RECORDS, Long.toString(jobStats.getReduceOutputRecords())); 097 // currently returns null; pig bug 098 jobStatsGroup.put("HADOOP_COUNTERS", toJSONFromCounters(jobStats.getHadoopCounters())); 099 100 // pig generated hadoop counters and other stats 101 jobStatsGroup.put("Alias", jobStats.getAlias()); 102 jobStatsGroup.put("AVG_MAP_TIME", Long.toString(jobStats.getAvgMapTime())); 103 jobStatsGroup.put("AVG_REDUCE_TIME", Long.toString(jobStats.getAvgREduceTime())); 104 jobStatsGroup.put("BYTES_WRITTEN", Long.toString(jobStats.getBytesWritten())); 105 jobStatsGroup.put("ERROR_MESSAGE", jobStats.getErrorMessage()); 106 jobStatsGroup.put("FEATURE", jobStats.getFeature()); 107 jobStatsGroup.put("JOB_ID", jobStats.getJobId()); 108 jobStatsGroup.put("MAX_MAP_TIME", Long.toString(jobStats.getMaxMapTime())); 109 jobStatsGroup.put("MIN_MAP_TIME", Long.toString(jobStats.getMinMapTime())); 110 jobStatsGroup.put("MAX_REDUCE_TIME", Long.toString(jobStats.getMaxReduceTime())); 111 jobStatsGroup.put("MIN_REDUCE_TIME", Long.toString(jobStats.getMinReduceTime())); 112 jobStatsGroup.put("NUMBER_MAPS", Long.toString(jobStats.getNumberMaps())); 113 jobStatsGroup.put("NUMBER_REDUCES", Long.toString(jobStats.getNumberReduces())); 114 jobStatsGroup.put("PROACTIVE_SPILL_COUNT_OBJECTS", Long.toString(jobStats.getProactiveSpillCountObjects())); 115 jobStatsGroup.put("PROACTIVE_SPILL_COUNT_RECS", Long.toString(jobStats.getProactiveSpillCountRecs())); 116 jobStatsGroup.put("RECORD_WRITTEN", Long.toString(jobStats.getRecordWrittern())); 117 jobStatsGroup.put("SMMS_SPILL_COUNT", Long.toString(jobStats.getSMMSpillCount())); 118 jobStatsGroup.put("MULTI_STORE_COUNTERS", toJSONFromMultiStoreCounters(jobStats.getMultiStoreCounters())); 119 120 return jobStatsGroup; 121 122 } 123 124 // multistorecounters to JSON 125 @SuppressWarnings("unchecked") 126 private static JSONObject toJSONFromMultiStoreCounters(Map<String, Long> map) { 127 JSONObject group = new JSONObject(); 128 for (String cName : map.keySet()) { 129 group.put(cName, map.get(cName)); 130 } 131 return group; 132 133 } 134 135 // hadoop counters to JSON 136 @SuppressWarnings("unchecked") 137 private static JSONObject toJSONFromCounters(Counters counters) { 138 if (counters == null) { 139 return null; 140 } 141 142 JSONObject groups = new JSONObject(); 143 for (String gName : counters.getGroupNames()) { 144 JSONObject group = new JSONObject(); 145 for (Counter counter : counters.getGroup(gName)) { 146 String cName = counter.getName(); 147 Long cValue = counter.getValue(); 148 group.put(cName, Long.toString(cValue)); 149 } 150 groups.put(gName, group); 151 } 152 return groups; 153 154 } 155 156 }