001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.action.hadoop; 019 020 import java.io.IOException; 021 import java.util.ArrayList; 022 import java.util.List; 023 import java.util.Map; 024 025 import org.apache.hadoop.mapred.Counters; 026 import org.apache.hadoop.mapred.Counters.Counter; 027 import org.apache.oozie.util.ParamChecker; 028 import org.apache.pig.tools.pigstats.JobStats; 029 import org.apache.pig.tools.pigstats.PigStats; 030 import org.apache.pig.tools.pigstats.PigStatsUtil; 031 import org.json.simple.JSONObject; 032 033 /** 034 * Class to collect the Pig statistics for a Pig action 035 * 036 */ 037 public class OoziePigStats extends ActionStats { 038 PigStats pigStats = null; 039 040 public OoziePigStats(PigStats pigStats) { 041 this.currentActionType = ActionType.PIG; 042 this.pigStats = pigStats; 043 } 044 045 /** 046 * The PigStats API is used to collect the statistics and the result is returned as a JSON String. 047 * 048 * @return a JSON string 049 */ 050 @SuppressWarnings("unchecked") 051 @Override 052 public String toJSON() { 053 JSONObject pigStatsGroup = new JSONObject(); 054 pigStatsGroup.put("ACTION_TYPE", getCurrentActionType().toString()); 055 056 // pig summary related counters 057 pigStatsGroup.put("BYTES_WRITTEN", Long.toString(pigStats.getBytesWritten())); 058 pigStatsGroup.put("DURATION", Long.toString(pigStats.getDuration())); 059 pigStatsGroup.put("ERROR_CODE", Long.toString(pigStats.getErrorCode())); 060 pigStatsGroup.put("ERROR_MESSAGE", pigStats.getErrorMessage()); 061 pigStatsGroup.put("FEATURES", pigStats.getFeatures()); 062 pigStatsGroup.put("HADOOP_VERSION", pigStats.getHadoopVersion()); 063 pigStatsGroup.put("NUMBER_JOBS", Long.toString(pigStats.getNumberJobs())); 064 pigStatsGroup.put("PIG_VERSION", pigStats.getPigVersion()); 065 pigStatsGroup.put("PROACTIVE_SPILL_COUNT_OBJECTS", Long.toString(pigStats.getProactiveSpillCountObjects())); 066 pigStatsGroup.put("PROACTIVE_SPILL_COUNT_RECORDS", Long.toString(pigStats.getProactiveSpillCountRecords())); 067 pigStatsGroup.put("RECORD_WRITTEN", Long.toString(pigStats.getRecordWritten())); 068 pigStatsGroup.put("RETURN_CODE", Long.toString(pigStats.getReturnCode())); 069 pigStatsGroup.put("SCRIPT_ID", pigStats.getScriptId()); 070 pigStatsGroup.put("SMM_SPILL_COUNT", Long.toString(pigStats.getSMMSpillCount())); 071 072 PigStats.JobGraph jobGraph = pigStats.getJobGraph(); 073 StringBuffer sb = new StringBuffer(); 074 String separator = ","; 075 076 for (JobStats jobStats : jobGraph) { 077 // Get all the HadoopIds and put them as comma separated string for JOB_GRAPH 078 String hadoopId = jobStats.getJobId(); 079 if (sb.length() > 0) { 080 sb.append(separator); 081 } 082 sb.append(hadoopId); 083 // Hadoop Counters for pig created MR job 084 pigStatsGroup.put(hadoopId, toJSONFromJobStats(jobStats)); 085 } 086 pigStatsGroup.put("JOB_GRAPH", sb.toString()); 087 return pigStatsGroup.toJSONString(); 088 } 089 090 // MR job related counters 091 @SuppressWarnings("unchecked") 092 private static JSONObject toJSONFromJobStats(JobStats jobStats) { 093 JSONObject jobStatsGroup = new JSONObject(); 094 095 // hadoop counters 096 jobStatsGroup.put(PigStatsUtil.HDFS_BYTES_WRITTEN, Long.toString(jobStats.getHdfsBytesWritten())); 097 jobStatsGroup.put(PigStatsUtil.MAP_INPUT_RECORDS, Long.toString(jobStats.getMapInputRecords())); 098 jobStatsGroup.put(PigStatsUtil.MAP_OUTPUT_RECORDS, Long.toString(jobStats.getMapOutputRecords())); 099 jobStatsGroup.put(PigStatsUtil.REDUCE_INPUT_RECORDS, Long.toString(jobStats.getReduceInputRecords())); 100 jobStatsGroup.put(PigStatsUtil.REDUCE_OUTPUT_RECORDS, Long.toString(jobStats.getReduceOutputRecords())); 101 // currently returns null; pig bug 102 jobStatsGroup.put("HADOOP_COUNTERS", toJSONFromCounters(jobStats.getHadoopCounters())); 103 104 // pig generated hadoop counters and other stats 105 jobStatsGroup.put("Alias", jobStats.getAlias()); 106 jobStatsGroup.put("AVG_MAP_TIME", Long.toString(jobStats.getAvgMapTime())); 107 jobStatsGroup.put("AVG_REDUCE_TIME", Long.toString(jobStats.getAvgREduceTime())); 108 jobStatsGroup.put("BYTES_WRITTEN", Long.toString(jobStats.getBytesWritten())); 109 jobStatsGroup.put("ERROR_MESSAGE", jobStats.getErrorMessage()); 110 jobStatsGroup.put("FEATURE", jobStats.getFeature()); 111 jobStatsGroup.put("JOB_ID", jobStats.getJobId()); 112 jobStatsGroup.put("MAX_MAP_TIME", Long.toString(jobStats.getMaxMapTime())); 113 jobStatsGroup.put("MIN_MAP_TIME", Long.toString(jobStats.getMinMapTime())); 114 jobStatsGroup.put("MAX_REDUCE_TIME", Long.toString(jobStats.getMaxReduceTime())); 115 jobStatsGroup.put("MIN_REDUCE_TIME", Long.toString(jobStats.getMinReduceTime())); 116 jobStatsGroup.put("NUMBER_MAPS", Long.toString(jobStats.getNumberMaps())); 117 jobStatsGroup.put("NUMBER_REDUCES", Long.toString(jobStats.getNumberReduces())); 118 jobStatsGroup.put("PROACTIVE_SPILL_COUNT_OBJECTS", Long.toString(jobStats.getProactiveSpillCountObjects())); 119 jobStatsGroup.put("PROACTIVE_SPILL_COUNT_RECS", Long.toString(jobStats.getProactiveSpillCountRecs())); 120 jobStatsGroup.put("RECORD_WRITTEN", Long.toString(jobStats.getRecordWrittern())); 121 jobStatsGroup.put("SMMS_SPILL_COUNT", Long.toString(jobStats.getSMMSpillCount())); 122 jobStatsGroup.put("MULTI_STORE_COUNTERS", toJSONFromMultiStoreCounters(jobStats.getMultiStoreCounters())); 123 124 return jobStatsGroup; 125 126 } 127 128 // multistorecounters to JSON 129 @SuppressWarnings("unchecked") 130 private static JSONObject toJSONFromMultiStoreCounters(Map<String, Long> map) { 131 JSONObject group = new JSONObject(); 132 for (String cName : map.keySet()) { 133 group.put(cName, map.get(cName)); 134 } 135 return group; 136 137 } 138 139 // hadoop counters to JSON 140 @SuppressWarnings("unchecked") 141 private static JSONObject toJSONFromCounters(Counters counters) { 142 if (counters == null) { 143 return null; 144 } 145 146 JSONObject groups = new JSONObject(); 147 for (String gName : counters.getGroupNames()) { 148 JSONObject group = new JSONObject(); 149 for (Counter counter : counters.getGroup(gName)) { 150 String cName = counter.getName(); 151 Long cValue = counter.getValue(); 152 group.put(cName, Long.toString(cValue)); 153 } 154 groups.put(gName, group); 155 } 156 return groups; 157 158 } 159 160 }