001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.action.hadoop;
019    
020    import java.util.Map;
021    
022    import org.apache.hadoop.mapred.Counters;
023    import org.apache.hadoop.mapred.Counters.Counter;
024    import org.apache.pig.tools.pigstats.JobStats;
025    import org.apache.pig.tools.pigstats.PigStats;
026    import org.apache.pig.tools.pigstats.PigStatsUtil;
027    import org.json.simple.JSONObject;
028    
029    /**
030     * Class to collect the Pig statistics for a Pig action
031     *
032     */
033    public class OoziePigStats extends ActionStats {
034        PigStats pigStats = null;
035    
036        public OoziePigStats(PigStats pigStats) {
037            this.currentActionType = ActionType.PIG;
038            this.pigStats = pigStats;
039        }
040    
041        /**
042         * The PigStats API is used to collect the statistics and the result is returned as a JSON String.
043         *
044         * @return a JSON string
045         */
046        @SuppressWarnings("unchecked")
047        @Override
048        public String toJSON() {
049            JSONObject pigStatsGroup = new JSONObject();
050            pigStatsGroup.put("ACTION_TYPE", getCurrentActionType().toString());
051    
052            // pig summary related counters
053            pigStatsGroup.put("BYTES_WRITTEN", Long.toString(pigStats.getBytesWritten()));
054            pigStatsGroup.put("DURATION", Long.toString(pigStats.getDuration()));
055            pigStatsGroup.put("ERROR_CODE", Long.toString(pigStats.getErrorCode()));
056            pigStatsGroup.put("ERROR_MESSAGE", pigStats.getErrorMessage());
057            pigStatsGroup.put("FEATURES", pigStats.getFeatures());
058            pigStatsGroup.put("HADOOP_VERSION", pigStats.getHadoopVersion());
059            pigStatsGroup.put("NUMBER_JOBS", Long.toString(pigStats.getNumberJobs()));
060            pigStatsGroup.put("PIG_VERSION", pigStats.getPigVersion());
061            pigStatsGroup.put("PROACTIVE_SPILL_COUNT_OBJECTS", Long.toString(pigStats.getProactiveSpillCountObjects()));
062            pigStatsGroup.put("PROACTIVE_SPILL_COUNT_RECORDS", Long.toString(pigStats.getProactiveSpillCountRecords()));
063            pigStatsGroup.put("RECORD_WRITTEN", Long.toString(pigStats.getRecordWritten()));
064            pigStatsGroup.put("RETURN_CODE", Long.toString(pigStats.getReturnCode()));
065            pigStatsGroup.put("SCRIPT_ID", pigStats.getScriptId());
066            pigStatsGroup.put("SMM_SPILL_COUNT", Long.toString(pigStats.getSMMSpillCount()));
067    
068            PigStats.JobGraph jobGraph = pigStats.getJobGraph();
069            StringBuffer sb = new StringBuffer();
070            String separator = ",";
071    
072            for (JobStats jobStats : jobGraph) {
073                // Get all the HadoopIds and put them as comma separated string for JOB_GRAPH
074                String hadoopId = jobStats.getJobId();
075                if (sb.length() > 0) {
076                    sb.append(separator);
077                }
078                sb.append(hadoopId);
079                // Hadoop Counters for pig created MR job
080                pigStatsGroup.put(hadoopId, toJSONFromJobStats(jobStats));
081            }
082            pigStatsGroup.put("JOB_GRAPH", sb.toString());
083            return pigStatsGroup.toJSONString();
084        }
085    
086        // MR job related counters
087        @SuppressWarnings("unchecked")
088        private static JSONObject toJSONFromJobStats(JobStats jobStats) {
089            JSONObject jobStatsGroup = new JSONObject();
090    
091            // hadoop counters
092            jobStatsGroup.put(PigStatsUtil.HDFS_BYTES_WRITTEN, Long.toString(jobStats.getHdfsBytesWritten()));
093            jobStatsGroup.put(PigStatsUtil.MAP_INPUT_RECORDS, Long.toString(jobStats.getMapInputRecords()));
094            jobStatsGroup.put(PigStatsUtil.MAP_OUTPUT_RECORDS, Long.toString(jobStats.getMapOutputRecords()));
095            jobStatsGroup.put(PigStatsUtil.REDUCE_INPUT_RECORDS, Long.toString(jobStats.getReduceInputRecords()));
096            jobStatsGroup.put(PigStatsUtil.REDUCE_OUTPUT_RECORDS, Long.toString(jobStats.getReduceOutputRecords()));
097            // currently returns null; pig bug
098            jobStatsGroup.put("HADOOP_COUNTERS", toJSONFromCounters(jobStats.getHadoopCounters()));
099    
100            // pig generated hadoop counters and other stats
101            jobStatsGroup.put("Alias", jobStats.getAlias());
102            jobStatsGroup.put("AVG_MAP_TIME", Long.toString(jobStats.getAvgMapTime()));
103            jobStatsGroup.put("AVG_REDUCE_TIME", Long.toString(jobStats.getAvgREduceTime()));
104            jobStatsGroup.put("BYTES_WRITTEN", Long.toString(jobStats.getBytesWritten()));
105            jobStatsGroup.put("ERROR_MESSAGE", jobStats.getErrorMessage());
106            jobStatsGroup.put("FEATURE", jobStats.getFeature());
107            jobStatsGroup.put("JOB_ID", jobStats.getJobId());
108            jobStatsGroup.put("MAX_MAP_TIME", Long.toString(jobStats.getMaxMapTime()));
109            jobStatsGroup.put("MIN_MAP_TIME", Long.toString(jobStats.getMinMapTime()));
110            jobStatsGroup.put("MAX_REDUCE_TIME", Long.toString(jobStats.getMaxReduceTime()));
111            jobStatsGroup.put("MIN_REDUCE_TIME", Long.toString(jobStats.getMinReduceTime()));
112            jobStatsGroup.put("NUMBER_MAPS", Long.toString(jobStats.getNumberMaps()));
113            jobStatsGroup.put("NUMBER_REDUCES", Long.toString(jobStats.getNumberReduces()));
114            jobStatsGroup.put("PROACTIVE_SPILL_COUNT_OBJECTS", Long.toString(jobStats.getProactiveSpillCountObjects()));
115            jobStatsGroup.put("PROACTIVE_SPILL_COUNT_RECS", Long.toString(jobStats.getProactiveSpillCountRecs()));
116            jobStatsGroup.put("RECORD_WRITTEN", Long.toString(jobStats.getRecordWrittern()));
117            jobStatsGroup.put("SMMS_SPILL_COUNT", Long.toString(jobStats.getSMMSpillCount()));
118            jobStatsGroup.put("MULTI_STORE_COUNTERS", toJSONFromMultiStoreCounters(jobStats.getMultiStoreCounters()));
119    
120            return jobStatsGroup;
121    
122        }
123    
124        // multistorecounters to JSON
125        @SuppressWarnings("unchecked")
126        private static JSONObject toJSONFromMultiStoreCounters(Map<String, Long> map) {
127            JSONObject group = new JSONObject();
128            for (String cName : map.keySet()) {
129                group.put(cName, map.get(cName));
130            }
131            return group;
132    
133        }
134    
135        // hadoop counters to JSON
136        @SuppressWarnings("unchecked")
137        private static JSONObject toJSONFromCounters(Counters counters) {
138            if (counters == null) {
139                return null;
140            }
141    
142            JSONObject groups = new JSONObject();
143            for (String gName : counters.getGroupNames()) {
144                JSONObject group = new JSONObject();
145                for (Counter counter : counters.getGroup(gName)) {
146                    String cName = counter.getName();
147                    Long cValue = counter.getValue();
148                    group.put(cName, Long.toString(cValue));
149                }
150                groups.put(gName, group);
151            }
152            return groups;
153    
154        }
155    
156    }