001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.oozie.coord; 019 020import java.text.ParseException; 021import java.util.ArrayList; 022import java.util.Date; 023import java.util.HashSet; 024import java.util.List; 025import java.util.Set; 026 027import org.apache.hadoop.conf.Configuration; 028import org.apache.oozie.CoordinatorActionBean; 029import org.apache.oozie.ErrorCode; 030import org.apache.oozie.XException; 031import org.apache.oozie.client.OozieClient; 032import org.apache.oozie.client.rest.RestConstants; 033import org.apache.oozie.command.CommandException; 034import org.apache.oozie.executor.jpa.CoordActionGetJPAExecutor; 035import org.apache.oozie.executor.jpa.CoordJobGetActionForNominalTimeJPAExecutor; 036import org.apache.oozie.executor.jpa.JPAExecutorException; 037import org.apache.oozie.service.JPAService; 038import org.apache.oozie.service.Services; 039import org.apache.oozie.service.XLogService; 040import org.apache.oozie.util.CoordActionsInDateRange; 041import org.apache.oozie.util.DateUtils; 042import org.apache.oozie.util.ParamChecker; 043import org.apache.oozie.util.XLog; 044import org.jdom.Element; 045 046public class CoordUtils { 047 public static final String HADOOP_USER = "user.name"; 048 049 public static String getDoneFlag(Element doneFlagElement) { 050 if (doneFlagElement != null) { 051 return doneFlagElement.getTextTrim(); 052 } 053 else { 054 return CoordELConstants.DEFAULT_DONE_FLAG; 055 } 056 } 057 058 public static Configuration getHadoopConf(Configuration jobConf) { 059 Configuration conf = new Configuration(); 060 ParamChecker.notNull(jobConf, "Configuration to be used for hadoop setup "); 061 String user = ParamChecker.notEmpty(jobConf.get(OozieClient.USER_NAME), OozieClient.USER_NAME); 062 conf.set(HADOOP_USER, user); 063 return conf; 064 } 065 066 /** 067 * Get the list of actions for a given coordinator job 068 * @param rangeType the rerun type (date, action) 069 * @param jobId the coordinator job id 070 * @param scope the date scope or action id scope 071 * @return the list of Coordinator actions 072 * @throws CommandException 073 */ 074 public static List<CoordinatorActionBean> getCoordActions(String rangeType, String jobId, String scope, 075 boolean active) throws CommandException { 076 List<CoordinatorActionBean> coordActions = null; 077 if (rangeType.equals(RestConstants.JOB_COORD_SCOPE_DATE)) { 078 coordActions = CoordUtils.getCoordActionsFromDates(jobId, scope, active); 079 } 080 else if (rangeType.equals(RestConstants.JOB_COORD_SCOPE_ACTION)) { 081 coordActions = CoordUtils.getCoordActionsFromIds(jobId, scope); 082 } 083 return coordActions; 084 } 085 086 /** 087 * Get the list of actions for given date ranges 088 * 089 * @param jobId coordinator job id 090 * @param scope a comma-separated list of date ranges. Each date range element is specified with two dates separated by '::' 091 * @return the list of Coordinator actions for the date range 092 * @throws CommandException thrown if failed to get coordinator actions by given date range 093 */ 094 static List<CoordinatorActionBean> getCoordActionsFromDates(String jobId, String scope, boolean active) 095 throws CommandException { 096 JPAService jpaService = Services.get().get(JPAService.class); 097 ParamChecker.notEmpty(jobId, "jobId"); 098 ParamChecker.notEmpty(scope, "scope"); 099 100 Set<CoordinatorActionBean> actionSet = new HashSet<CoordinatorActionBean>(); 101 String[] list = scope.split(","); 102 for (String s : list) { 103 s = s.trim(); 104 // A date range is specified with two dates separated by '::' 105 if (s.contains("::")) { 106 List<CoordinatorActionBean> listOfActions; 107 try { 108 // Get list of actions within the range of date 109 listOfActions = CoordActionsInDateRange.getCoordActionsFromDateRange(jobId, s, active); 110 } 111 catch (XException e) { 112 throw new CommandException(e); 113 } 114 actionSet.addAll(listOfActions); 115 } 116 else { 117 try { 118 // Get action for the nominal time 119 Date date = DateUtils.parseDateOozieTZ(s.trim()); 120 CoordinatorActionBean coordAction = jpaService 121 .execute(new CoordJobGetActionForNominalTimeJPAExecutor(jobId, date)); 122 123 if (coordAction != null) { 124 actionSet.add(coordAction); 125 } 126 else { 127 throw new RuntimeException("This should never happen, Coordinator Action shouldn't be null"); 128 } 129 } 130 catch (ParseException e) { 131 throw new CommandException(ErrorCode.E0302, s.trim(), e); 132 } 133 catch (JPAExecutorException e) { 134 throw new CommandException(e); 135 } 136 137 } 138 } 139 140 List<CoordinatorActionBean> coordActions = new ArrayList<CoordinatorActionBean>(); 141 for (CoordinatorActionBean coordAction : actionSet) { 142 coordActions.add(coordAction); 143 } 144 return coordActions; 145 } 146 147 /** 148 * Get the list of actions for given id ranges 149 * 150 * @param jobId coordinator job id 151 * @param scope a comma-separated list of action ranges. The action range is specified with two action numbers separated by '-' 152 * @return the list of all Coordinator actions for action range 153 * @throws CommandException thrown if failed to get coordinator actions by given id range 154 */ 155 public static List<CoordinatorActionBean> getCoordActionsFromIds(String jobId, String scope) throws CommandException { 156 JPAService jpaService = Services.get().get(JPAService.class); 157 ParamChecker.notEmpty(jobId, "jobId"); 158 ParamChecker.notEmpty(scope, "scope"); 159 160 Set<String> actions = new HashSet<String>(); 161 String[] list = scope.split(","); 162 for (String s : list) { 163 s = s.trim(); 164 // An action range is specified with two actions separated by '-' 165 if (s.contains("-")) { 166 String[] range = s.split("-"); 167 // Check the format for action's range 168 if (range.length != 2) { 169 throw new CommandException(ErrorCode.E0302, "format is wrong for action's range '" + s + "', an example of correct format is 1-5"); 170 } 171 int start; 172 int end; 173 //Get the starting and ending action numbers 174 try { 175 start = Integer.parseInt(range[0].trim()); 176 } catch (NumberFormatException ne) { 177 throw new CommandException(ErrorCode.E0302, "could not parse " + range[0].trim() + "into an integer", ne); 178 } 179 try { 180 end = Integer.parseInt(range[1].trim()); 181 } catch (NumberFormatException ne) { 182 throw new CommandException(ErrorCode.E0302, "could not parse " + range[1].trim() + "into an integer", ne); 183 } 184 if (start > end) { 185 throw new CommandException(ErrorCode.E0302, "format is wrong for action's range '" + s + "', starting action" 186 + "number of the range should be less than ending action number, an example will be 1-4"); 187 } 188 // Add the actionIds 189 for (int i = start; i <= end; i++) { 190 actions.add(jobId + "@" + i); 191 } 192 } 193 else { 194 try { 195 Integer.parseInt(s); 196 } 197 catch (NumberFormatException ne) { 198 throw new CommandException(ErrorCode.E0302, "format is wrong for action id'" + s 199 + "'. Integer only."); 200 } 201 actions.add(jobId + "@" + s); 202 } 203 } 204 // Retrieve the actions using the corresponding actionIds 205 List<CoordinatorActionBean> coordActions = new ArrayList<CoordinatorActionBean>(); 206 for (String id : actions) { 207 CoordinatorActionBean coordAction = null; 208 try { 209 coordAction = jpaService.execute(new CoordActionGetJPAExecutor(id)); 210 } 211 catch (JPAExecutorException je) { 212 if (je.getErrorCode().equals(ErrorCode.E0605)) { //ignore retrieval of non-existent actions in range 213 XLog.getLog(XLogService.class).warn( 214 "Coord action ID num [{0}] not yet materialized. Hence skipping over it for Kill action", 215 id.substring(id.indexOf("@") + 1)); 216 continue; 217 } 218 else { 219 throw new CommandException(je); 220 } 221 } 222 coordActions.add(coordAction); 223 } 224 return coordActions; 225 } 226 227}