001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.action.hadoop;
019    
020    import java.io.IOException;
021    import java.util.ArrayList;
022    import java.util.List;
023    import java.util.Map;
024    
025    import org.apache.hadoop.mapred.Counters;
026    import org.apache.hadoop.mapred.Counters.Counter;
027    import org.apache.oozie.util.ParamChecker;
028    import org.apache.pig.tools.pigstats.JobStats;
029    import org.apache.pig.tools.pigstats.PigStats;
030    import org.apache.pig.tools.pigstats.PigStatsUtil;
031    import org.json.simple.JSONObject;
032    
033    /**
034     * Class to collect the Pig statistics for a Pig action
035     *
036     */
037    public class OoziePigStats extends ActionStats {
038        PigStats pigStats = null;
039    
040        public OoziePigStats(PigStats pigStats) {
041            this.currentActionType = ActionType.PIG;
042            this.pigStats = pigStats;
043        }
044    
045        /**
046         * The PigStats API is used to collect the statistics and the result is returned as a JSON String.
047         *
048         * @return a JSON string
049         */
050        @SuppressWarnings("unchecked")
051        @Override
052        public String toJSON() {
053            JSONObject pigStatsGroup = new JSONObject();
054            pigStatsGroup.put("ACTION_TYPE", getCurrentActionType().toString());
055    
056            // pig summary related counters
057            pigStatsGroup.put("BYTES_WRITTEN", Long.toString(pigStats.getBytesWritten()));
058            pigStatsGroup.put("DURATION", Long.toString(pigStats.getDuration()));
059            pigStatsGroup.put("ERROR_CODE", Long.toString(pigStats.getErrorCode()));
060            pigStatsGroup.put("ERROR_MESSAGE", pigStats.getErrorMessage());
061            pigStatsGroup.put("FEATURES", pigStats.getFeatures());
062            pigStatsGroup.put("HADOOP_VERSION", pigStats.getHadoopVersion());
063            pigStatsGroup.put("NUMBER_JOBS", Long.toString(pigStats.getNumberJobs()));
064            pigStatsGroup.put("PIG_VERSION", pigStats.getPigVersion());
065            pigStatsGroup.put("PROACTIVE_SPILL_COUNT_OBJECTS", Long.toString(pigStats.getProactiveSpillCountObjects()));
066            pigStatsGroup.put("PROACTIVE_SPILL_COUNT_RECORDS", Long.toString(pigStats.getProactiveSpillCountRecords()));
067            pigStatsGroup.put("RECORD_WRITTEN", Long.toString(pigStats.getRecordWritten()));
068            pigStatsGroup.put("RETURN_CODE", Long.toString(pigStats.getReturnCode()));
069            pigStatsGroup.put("SCRIPT_ID", pigStats.getScriptId());
070            pigStatsGroup.put("SMM_SPILL_COUNT", Long.toString(pigStats.getSMMSpillCount()));
071    
072            PigStats.JobGraph jobGraph = pigStats.getJobGraph();
073            StringBuffer sb = new StringBuffer();
074            String separator = ",";
075    
076            for (JobStats jobStats : jobGraph) {
077                // Get all the HadoopIds and put them as comma separated string for JOB_GRAPH
078                String hadoopId = jobStats.getJobId();
079                if (sb.length() > 0) {
080                    sb.append(separator);
081                }
082                sb.append(hadoopId);
083                // Hadoop Counters for pig created MR job
084                pigStatsGroup.put(hadoopId, toJSONFromJobStats(jobStats));
085            }
086            pigStatsGroup.put("JOB_GRAPH", sb.toString());
087            return pigStatsGroup.toJSONString();
088        }
089    
090        // MR job related counters
091        @SuppressWarnings("unchecked")
092        private static JSONObject toJSONFromJobStats(JobStats jobStats) {
093            JSONObject jobStatsGroup = new JSONObject();
094    
095            // hadoop counters
096            jobStatsGroup.put(PigStatsUtil.HDFS_BYTES_WRITTEN, Long.toString(jobStats.getHdfsBytesWritten()));
097            jobStatsGroup.put(PigStatsUtil.MAP_INPUT_RECORDS, Long.toString(jobStats.getMapInputRecords()));
098            jobStatsGroup.put(PigStatsUtil.MAP_OUTPUT_RECORDS, Long.toString(jobStats.getMapOutputRecords()));
099            jobStatsGroup.put(PigStatsUtil.REDUCE_INPUT_RECORDS, Long.toString(jobStats.getReduceInputRecords()));
100            jobStatsGroup.put(PigStatsUtil.REDUCE_OUTPUT_RECORDS, Long.toString(jobStats.getReduceOutputRecords()));
101            // currently returns null; pig bug
102            jobStatsGroup.put("HADOOP_COUNTERS", toJSONFromCounters(jobStats.getHadoopCounters()));
103    
104            // pig generated hadoop counters and other stats
105            jobStatsGroup.put("Alias", jobStats.getAlias());
106            jobStatsGroup.put("AVG_MAP_TIME", Long.toString(jobStats.getAvgMapTime()));
107            jobStatsGroup.put("AVG_REDUCE_TIME", Long.toString(jobStats.getAvgREduceTime()));
108            jobStatsGroup.put("BYTES_WRITTEN", Long.toString(jobStats.getBytesWritten()));
109            jobStatsGroup.put("ERROR_MESSAGE", jobStats.getErrorMessage());
110            jobStatsGroup.put("FEATURE", jobStats.getFeature());
111            jobStatsGroup.put("JOB_ID", jobStats.getJobId());
112            jobStatsGroup.put("MAX_MAP_TIME", Long.toString(jobStats.getMaxMapTime()));
113            jobStatsGroup.put("MIN_MAP_TIME", Long.toString(jobStats.getMinMapTime()));
114            jobStatsGroup.put("MAX_REDUCE_TIME", Long.toString(jobStats.getMaxReduceTime()));
115            jobStatsGroup.put("MIN_REDUCE_TIME", Long.toString(jobStats.getMinReduceTime()));
116            jobStatsGroup.put("NUMBER_MAPS", Long.toString(jobStats.getNumberMaps()));
117            jobStatsGroup.put("NUMBER_REDUCES", Long.toString(jobStats.getNumberReduces()));
118            jobStatsGroup.put("PROACTIVE_SPILL_COUNT_OBJECTS", Long.toString(jobStats.getProactiveSpillCountObjects()));
119            jobStatsGroup.put("PROACTIVE_SPILL_COUNT_RECS", Long.toString(jobStats.getProactiveSpillCountRecs()));
120            jobStatsGroup.put("RECORD_WRITTEN", Long.toString(jobStats.getRecordWrittern()));
121            jobStatsGroup.put("SMMS_SPILL_COUNT", Long.toString(jobStats.getSMMSpillCount()));
122            jobStatsGroup.put("MULTI_STORE_COUNTERS", toJSONFromMultiStoreCounters(jobStats.getMultiStoreCounters()));
123    
124            return jobStatsGroup;
125    
126        }
127    
128        // multistorecounters to JSON
129        @SuppressWarnings("unchecked")
130        private static JSONObject toJSONFromMultiStoreCounters(Map<String, Long> map) {
131            JSONObject group = new JSONObject();
132            for (String cName : map.keySet()) {
133                group.put(cName, map.get(cName));
134            }
135            return group;
136    
137        }
138    
139        // hadoop counters to JSON
140        @SuppressWarnings("unchecked")
141        private static JSONObject toJSONFromCounters(Counters counters) {
142            if (counters == null) {
143                return null;
144            }
145    
146            JSONObject groups = new JSONObject();
147            for (String gName : counters.getGroupNames()) {
148                JSONObject group = new JSONObject();
149                for (Counter counter : counters.getGroup(gName)) {
150                    String cName = counter.getName();
151                    Long cValue = counter.getValue();
152                    group.put(cName, Long.toString(cValue));
153                }
154                groups.put(gName, group);
155            }
156            return groups;
157    
158        }
159    
160    }