001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.coord; 020 021import java.text.ParseException; 022import java.util.ArrayList; 023import java.util.Arrays; 024import java.util.Date; 025import java.util.HashSet; 026import java.util.List; 027import java.util.Set; 028import java.util.Map; 029import java.util.HashMap; 030import java.util.concurrent.TimeUnit; 031 032import org.apache.commons.lang.StringUtils; 033import org.apache.hadoop.conf.Configuration; 034import org.apache.oozie.CoordinatorActionBean; 035import org.apache.oozie.CoordinatorEngine; 036import org.apache.oozie.ErrorCode; 037import org.apache.oozie.XException; 038import org.apache.oozie.client.OozieClient; 039import org.apache.oozie.client.rest.RestConstants; 040import org.apache.oozie.command.CommandException; 041import org.apache.oozie.coord.input.logic.CoordInputLogicEvaluator; 042import org.apache.oozie.coord.input.logic.InputLogicParser; 043import org.apache.oozie.executor.jpa.CoordActionGetJPAExecutor; 044import org.apache.oozie.executor.jpa.CoordJobGetActionForNominalTimeJPAExecutor; 045import org.apache.oozie.executor.jpa.JPAExecutorException; 046import org.apache.oozie.service.ConfigurationService; 047import org.apache.oozie.service.JPAService; 048import org.apache.oozie.service.Services; 049import org.apache.oozie.service.XLogService; 050import org.apache.oozie.sla.SLAOperations; 051import org.apache.oozie.util.CoordActionsInDateRange; 052import org.apache.oozie.util.DateUtils; 053import org.apache.oozie.util.Pair; 054import org.apache.oozie.util.ParamChecker; 055import org.apache.oozie.util.XLog; 056import org.apache.oozie.util.XmlUtils; 057import org.jdom.Element; 058import org.jdom.JDOMException; 059 060import com.google.common.annotations.VisibleForTesting; 061 062 063public class CoordUtils { 064 public static final String HADOOP_USER = "user.name"; 065 066 public static String getDoneFlag(Element doneFlagElement) { 067 if (doneFlagElement != null) { 068 return doneFlagElement.getTextTrim(); 069 } 070 else { 071 return CoordELConstants.DEFAULT_DONE_FLAG; 072 } 073 } 074 075 public static Configuration getHadoopConf(Configuration jobConf) { 076 Configuration conf = new Configuration(); 077 ParamChecker.notNull(jobConf, "Configuration to be used for hadoop setup "); 078 String user = ParamChecker.notEmpty(jobConf.get(OozieClient.USER_NAME), OozieClient.USER_NAME); 079 conf.set(HADOOP_USER, user); 080 return conf; 081 } 082 083 /** 084 * Get the list of actions for a given coordinator job 085 * @param rangeType the rerun type (date, action) 086 * @param jobId the coordinator job id 087 * @param scope the date scope or action id scope 088 * @return the list of Coordinator actions 089 * @throws CommandException 090 */ 091 public static List<CoordinatorActionBean> getCoordActions(String rangeType, String jobId, String scope, 092 boolean active) throws CommandException { 093 List<CoordinatorActionBean> coordActions = null; 094 if (rangeType.equals(RestConstants.JOB_COORD_SCOPE_DATE)) { 095 coordActions = CoordUtils.getCoordActionsFromDates(jobId, scope, active); 096 } 097 else if (rangeType.equals(RestConstants.JOB_COORD_SCOPE_ACTION)) { 098 coordActions = CoordUtils.getCoordActionsFromIds(jobId, scope); 099 } 100 return coordActions; 101 } 102 103 /** 104 * Get the list of actions for given date ranges 105 * 106 * @param jobId coordinator job id 107 * @param scope a comma-separated list of date ranges. Each date range element is specified with two dates separated by '::' 108 * @return the list of Coordinator actions for the date range 109 * @throws CommandException thrown if failed to get coordinator actions by given date range 110 */ 111 @VisibleForTesting 112 public static List<CoordinatorActionBean> getCoordActionsFromDates(String jobId, String scope, boolean active) 113 throws CommandException { 114 JPAService jpaService = Services.get().get(JPAService.class); 115 ParamChecker.notEmpty(jobId, "jobId"); 116 ParamChecker.notEmpty(scope, "scope"); 117 118 Set<CoordinatorActionBean> actionSet = new HashSet<CoordinatorActionBean>(); 119 String[] list = scope.split(","); 120 for (String s : list) { 121 s = s.trim(); 122 // A date range is specified with two dates separated by '::' 123 if (s.contains("::")) { 124 List<CoordinatorActionBean> listOfActions; 125 try { 126 // Get list of actions within the range of date 127 listOfActions = CoordActionsInDateRange.getCoordActionsFromDateRange(jobId, s, active); 128 } 129 catch (XException e) { 130 throw new CommandException(e); 131 } 132 actionSet.addAll(listOfActions); 133 } 134 else { 135 try { 136 // Get action for the nominal time 137 Date date = DateUtils.parseDateOozieTZ(s.trim()); 138 CoordinatorActionBean coordAction = jpaService 139 .execute(new CoordJobGetActionForNominalTimeJPAExecutor(jobId, date)); 140 141 if (coordAction != null) { 142 actionSet.add(coordAction); 143 } 144 else { 145 throw new RuntimeException("This should never happen, Coordinator Action shouldn't be null"); 146 } 147 } 148 catch (ParseException e) { 149 throw new CommandException(ErrorCode.E0302, s.trim(), e); 150 } 151 catch (JPAExecutorException e) { 152 if (e.getErrorCode() == ErrorCode.E0605) { 153 XLog.getLog(CoordUtils.class).info("No action for nominal time:" + s + ". Skipping over"); 154 } 155 throw new CommandException(e); 156 } 157 158 } 159 } 160 161 List<CoordinatorActionBean> coordActions = new ArrayList<CoordinatorActionBean>(); 162 for (CoordinatorActionBean coordAction : actionSet) { 163 coordActions.add(coordAction); 164 } 165 return coordActions; 166 } 167 168 public static Set<String> getActionsIds(String jobId, String scope) throws CommandException { 169 ParamChecker.notEmpty(jobId, "jobId"); 170 ParamChecker.notEmpty(scope, "scope"); 171 172 Set<String> actions = new HashSet<String>(); 173 String[] list = scope.split(","); 174 for (String s : list) { 175 s = s.trim(); 176 // An action range is specified with two actions separated by '-' 177 if (s.contains("-")) { 178 String[] range = s.split("-"); 179 // Check the format for action's range 180 if (range.length != 2) { 181 throw new CommandException(ErrorCode.E0302, "format is wrong for action's range '" + s + "', an example of correct format is 1-5"); 182 } 183 int start; 184 int end; 185 //Get the starting and ending action numbers 186 try { 187 start = Integer.parseInt(range[0].trim()); 188 } catch (NumberFormatException ne) { 189 throw new CommandException(ErrorCode.E0302, "could not parse " + range[0].trim() + "into an integer", ne); 190 } 191 try { 192 end = Integer.parseInt(range[1].trim()); 193 } catch (NumberFormatException ne) { 194 throw new CommandException(ErrorCode.E0302, "could not parse " + range[1].trim() + "into an integer", ne); 195 } 196 if (start > end) { 197 throw new CommandException(ErrorCode.E0302, "format is wrong for action's range '" + s + "', starting action" 198 + "number of the range should be less than ending action number, an example will be 1-4"); 199 } 200 // Add the actionIds 201 for (int i = start; i <= end; i++) { 202 actions.add(jobId + "@" + i); 203 } 204 } 205 else { 206 try { 207 Integer.parseInt(s); 208 } 209 catch (NumberFormatException ne) { 210 throw new CommandException(ErrorCode.E0302, "format is wrong for action id'" + s 211 + "'. Integer only."); 212 } 213 actions.add(jobId + "@" + s); 214 } 215 } 216 return actions; 217 } 218 219 /** 220 * Get the list of actions for given id ranges 221 * 222 * @param jobId coordinator job id 223 * @param scope a comma-separated list of action ranges. The action range is specified with two action numbers separated by '-' 224 * @return the list of all Coordinator actions for action range 225 * @throws CommandException thrown if failed to get coordinator actions by given id range 226 */ 227 @VisibleForTesting 228 public static List<CoordinatorActionBean> getCoordActionsFromIds(String jobId, String scope) throws CommandException { 229 JPAService jpaService = Services.get().get(JPAService.class); 230 Set<String> actions = getActionsIds(jobId, scope); 231 // Retrieve the actions using the corresponding actionIds 232 List<CoordinatorActionBean> coordActions = new ArrayList<CoordinatorActionBean>(); 233 for (String id : actions) { 234 CoordinatorActionBean coordAction = null; 235 try { 236 coordAction = jpaService.execute(new CoordActionGetJPAExecutor(id)); 237 } 238 catch (JPAExecutorException je) { 239 if (je.getErrorCode().equals(ErrorCode.E0605)) { //ignore retrieval of non-existent actions in range 240 XLog.getLog(XLogService.class).warn( 241 "Coord action ID num [{0}] not yet materialized. Hence skipping over it for Kill action", 242 id.substring(id.indexOf("@") + 1)); 243 continue; 244 } 245 else { 246 throw new CommandException(je); 247 } 248 } 249 coordActions.add(coordAction); 250 } 251 return coordActions; 252 } 253 254 /** 255 * Check if sla alert is disabled for action. 256 * @param actionBean 257 * @param coordName 258 * @param jobConf 259 * @return 260 * @throws ParseException 261 */ 262 public static boolean isSlaAlertDisabled(CoordinatorActionBean actionBean, String coordName, Configuration jobConf) 263 throws ParseException { 264 265 int disableSlaNotificationOlderThan = jobConf.getInt(OozieClient.SLA_DISABLE_ALERT_OLDER_THAN, 266 ConfigurationService.getInt(OozieClient.SLA_DISABLE_ALERT_OLDER_THAN)); 267 268 if (disableSlaNotificationOlderThan > 0) { 269 // Disable alert for catchup jobs 270 long timeDiffinHrs = TimeUnit.MILLISECONDS.toHours(new Date().getTime() 271 - actionBean.getNominalTime().getTime()); 272 if (timeDiffinHrs > jobConf.getLong(OozieClient.SLA_DISABLE_ALERT_OLDER_THAN, 273 ConfigurationService.getLong(OozieClient.SLA_DISABLE_ALERT_OLDER_THAN))) { 274 return true; 275 } 276 } 277 278 boolean disableAlert = false; 279 if (jobConf.get(OozieClient.SLA_DISABLE_ALERT_COORD) != null) { 280 String coords = jobConf.get(OozieClient.SLA_DISABLE_ALERT_COORD); 281 Set<String> coordsToDisableFor = new HashSet<String>(Arrays.asList(coords.split(","))); 282 if (coordsToDisableFor.contains(coordName)) { 283 return true; 284 } 285 if (coordsToDisableFor.contains(actionBean.getJobId())) { 286 return true; 287 } 288 } 289 290 // Check if sla alert is disabled for that action 291 if (!StringUtils.isEmpty(jobConf.get(OozieClient.SLA_DISABLE_ALERT)) 292 && getCoordActionSLAAlertStatus(actionBean, coordName, jobConf, OozieClient.SLA_DISABLE_ALERT)) { 293 return true; 294 } 295 296 // Check if sla alert is enabled for that action 297 if (!StringUtils.isEmpty(jobConf.get(OozieClient.SLA_ENABLE_ALERT)) 298 && getCoordActionSLAAlertStatus(actionBean, coordName, jobConf, OozieClient.SLA_ENABLE_ALERT)) { 299 return false; 300 } 301 302 return disableAlert; 303 } 304 305 /** 306 * Get coord action SLA alert status. 307 * @param actionBean 308 * @param coordName 309 * @param jobConf 310 * @param slaAlertType 311 * @return 312 * @throws ParseException 313 */ 314 private static boolean getCoordActionSLAAlertStatus(CoordinatorActionBean actionBean, String coordName, 315 Configuration jobConf, String slaAlertType) throws ParseException { 316 String slaAlertList; 317 318 if (!StringUtils.isEmpty(jobConf.get(slaAlertType))) { 319 slaAlertList = jobConf.get(slaAlertType); 320 // check if ALL or date/action-num range 321 if (slaAlertList.equalsIgnoreCase(SLAOperations.ALL_VALUE)) { 322 return true; 323 } 324 String[] values = slaAlertList.split(","); 325 for (String value : values) { 326 value = value.trim(); 327 if (value.contains("::")) { 328 String[] datesInRange = value.split("::"); 329 Date start = DateUtils.parseDateOozieTZ(datesInRange[0].trim()); 330 Date end = DateUtils.parseDateOozieTZ(datesInRange[1].trim()); 331 // check if nominal time in this range 332 if (actionBean.getNominalTime().compareTo(start) >= 0 333 || actionBean.getNominalTime().compareTo(end) <= 0) { 334 return true; 335 } 336 } 337 else if (value.contains("-")) { 338 String[] actionsInRange = value.split("-"); 339 int start = Integer.parseInt(actionsInRange[0].trim()); 340 int end = Integer.parseInt(actionsInRange[1].trim()); 341 // check if action number in this range 342 if (actionBean.getActionNumber() >= start || actionBean.getActionNumber() <= end) { 343 return true; 344 } 345 } 346 else { 347 int actionNumber = Integer.parseInt(value.trim()); 348 if (actionBean.getActionNumber() == actionNumber) { 349 return true; 350 } 351 } 352 } 353 } 354 return false; 355 } 356 357 // Form the where clause to filter by status values 358 public static Map<String, Object> getWhereClause(StringBuilder sb, Map<Pair<String, CoordinatorEngine.FILTER_COMPARATORS>, 359 List<Object>> filterMap) { 360 Map<String, Object> params = new HashMap<String, Object>(); 361 int pcnt= 1; 362 for (Map.Entry<Pair<String, CoordinatorEngine.FILTER_COMPARATORS>, List<Object>> filter : filterMap.entrySet()) { 363 String field = filter.getKey().getFist(); 364 CoordinatorEngine.FILTER_COMPARATORS comp = filter.getKey().getSecond(); 365 String sqlField; 366 if (field.equals(OozieClient.FILTER_STATUS)) { 367 sqlField = "a.statusStr"; 368 } else if (field.equals(OozieClient.FILTER_NOMINAL_TIME)) { 369 sqlField = "a.nominalTimestamp"; 370 } else { 371 throw new IllegalArgumentException("Invalid filter key " + field); 372 } 373 374 sb.append(" and ").append(sqlField).append(" "); 375 switch (comp) { 376 case EQUALS: 377 sb.append("IN ("); 378 params.putAll(appendParams(sb, filter.getValue(), pcnt)); 379 sb.append(")"); 380 break; 381 382 case NOT_EQUALS: 383 sb.append("NOT IN ("); 384 params.putAll(appendParams(sb, filter.getValue(), pcnt)); 385 sb.append(")"); 386 break; 387 388 case GREATER: 389 case GREATER_EQUAL: 390 case LESSTHAN: 391 case LESSTHAN_EQUAL: 392 if (filter.getValue().size() != 1) { 393 throw new IllegalArgumentException(field + comp.getSign() + " can't have more than 1 values"); 394 } 395 396 sb.append(comp.getSign()).append(" "); 397 params.putAll(appendParams(sb, filter.getValue(), pcnt)); 398 break; 399 } 400 401 pcnt += filter.getValue().size(); 402 } 403 sb.append(" "); 404 return params; 405 } 406 407 private static Map<String, Object> appendParams(StringBuilder sb, List<Object> value, int sindex) { 408 Map<String, Object> params = new HashMap<String, Object>(); 409 boolean first = true; 410 for (Object val : value) { 411 String pname = "p" + sindex++; 412 params.put(pname, val); 413 if (!first) { 414 sb.append(", "); 415 } 416 sb.append(':').append(pname); 417 first = false; 418 } 419 return params; 420 } 421 422 public static boolean isInputLogicSpecified(String actionXml) throws JDOMException { 423 return isInputLogicSpecified(XmlUtils.parseXml(actionXml)); 424 } 425 426 public static boolean isInputLogicSpecified(Element eAction) throws JDOMException { 427 return eAction.getChild(CoordInputLogicEvaluator.INPUT_LOGIC, eAction.getNamespace()) != null; 428 } 429 430 public static String getInputLogic(String actionXml) throws JDOMException { 431 return getInputLogic(XmlUtils.parseXml(actionXml)); 432 } 433 434 public static String getInputLogic(Element actionXml) throws JDOMException { 435 return new InputLogicParser().parse(actionXml.getChild(CoordInputLogicEvaluator.INPUT_LOGIC, 436 actionXml.getNamespace())); 437 } 438 439}