001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.command.coord; 019 020 import java.io.IOException; 021 import java.io.StringReader; 022 import java.util.ArrayList; 023 import java.util.Date; 024 import java.util.HashSet; 025 import java.util.List; 026 import java.util.Set; 027 028 import org.apache.hadoop.conf.Configuration; 029 import org.apache.hadoop.fs.FileSystem; 030 import org.apache.hadoop.fs.Path; 031 import org.apache.oozie.CoordinatorActionBean; 032 import org.apache.oozie.CoordinatorActionInfo; 033 import org.apache.oozie.CoordinatorJobBean; 034 import org.apache.oozie.ErrorCode; 035 import org.apache.oozie.XException; 036 import org.apache.oozie.client.CoordinatorAction; 037 import org.apache.oozie.client.CoordinatorJob; 038 import org.apache.oozie.client.SLAEvent.SlaAppType; 039 import org.apache.oozie.client.rest.RestConstants; 040 import org.apache.oozie.command.CommandException; 041 import org.apache.oozie.coord.CoordELFunctions; 042 import org.apache.oozie.service.HadoopAccessorService; 043 import org.apache.oozie.service.Services; 044 import org.apache.oozie.store.CoordinatorStore; 045 import org.apache.oozie.store.StoreException; 046 import org.apache.oozie.util.DateUtils; 047 import org.apache.oozie.util.ParamChecker; 048 import org.apache.oozie.util.XConfiguration; 049 import org.apache.oozie.util.XLog; 050 import org.apache.oozie.util.XmlUtils; 051 import org.apache.oozie.util.db.SLADbOperations; 052 import org.jdom.Element; 053 import org.jdom.JDOMException; 054 055 public class CoordRerunCommand extends CoordinatorCommand<CoordinatorActionInfo> { 056 057 private String jobId; 058 private String rerunType; 059 private String scope; 060 private boolean refresh; 061 private boolean noCleanup; 062 private final XLog log = XLog.getLog(getClass()); 063 064 public CoordRerunCommand(String jobId, String rerunType, String scope, boolean refresh, boolean noCleanup) { 065 super("coord_rerun", "coord_rerun", 1, XLog.STD); 066 this.jobId = ParamChecker.notEmpty(jobId, "jobId"); 067 this.rerunType = ParamChecker.notEmpty(rerunType, "rerunType"); 068 this.scope = ParamChecker.notEmpty(scope, "scope"); 069 this.refresh = refresh; 070 this.noCleanup = noCleanup; 071 } 072 073 @Override 074 protected CoordinatorActionInfo call(CoordinatorStore store) throws StoreException, CommandException { 075 try { 076 CoordinatorJobBean coordJob = store.getCoordinatorJob(jobId, false); 077 CoordinatorActionInfo coordInfo = null; 078 setLogInfo(coordJob); 079 if (coordJob.getStatus() != CoordinatorJob.Status.KILLED 080 && coordJob.getStatus() != CoordinatorJob.Status.FAILED) { 081 incrJobCounter(1); 082 083 List<CoordinatorActionBean> coordActions; 084 if (rerunType.equals(RestConstants.JOB_COORD_RERUN_DATE)) { 085 coordActions = getCoordActionsFromDates(jobId, scope, store); 086 } 087 else if (rerunType.equals(RestConstants.JOB_COORD_RERUN_ACTION)) { 088 coordActions = getCoordActionsFromIds(jobId, scope, store); 089 } 090 else { 091 throw new CommandException(ErrorCode.E1018, "date or action expected."); 092 } 093 if (checkAllActionsRunnable(coordActions)) { 094 Configuration conf = new XConfiguration(new StringReader(coordJob.getConf())); 095 for (CoordinatorActionBean coordAction : coordActions) { 096 String actionXml = coordAction.getActionXml(); 097 if (!noCleanup) { 098 Element eAction = XmlUtils.parseXml(actionXml); 099 cleanupOutputEvents(eAction, coordJob.getUser(), coordJob.getGroup(), conf); 100 } 101 if (refresh) { 102 refreshAction(coordJob, coordAction, store); 103 } 104 updateAction(coordJob, coordAction, actionXml, store); 105 106 // TODO: time 100s should be configurable 107 queueCallable(new CoordActionNotification(coordAction), 100); 108 queueCallable(new CoordActionInputCheckCommand(coordAction.getId()), 100); 109 } 110 } 111 else { 112 throw new CommandException(ErrorCode.E1018, "part or all actions are not eligible to rerun!"); 113 } 114 coordInfo = new CoordinatorActionInfo(coordActions); 115 } 116 else { 117 log.info("CoordRerunCommand is not able to run, job status=" + coordJob.getStatus() + ", jobid=" 118 + jobId); 119 throw new CommandException(ErrorCode.E1018, 120 "coordinator job is killed or failed so all actions are not eligible to rerun!"); 121 } 122 return coordInfo; 123 } 124 catch (XException xex) { 125 throw new CommandException(xex); 126 } 127 catch (JDOMException jex) { 128 throw new CommandException(ErrorCode.E0700, jex); 129 } 130 catch (Exception ex) { 131 throw new CommandException(ErrorCode.E1018, ex); 132 } 133 } 134 135 /** 136 * Get the list of actions for given id ranges 137 * 138 * @param jobId 139 * @param scope 140 * @param store 141 * @return the list of all actions to rerun 142 * @throws CommandException 143 * @throws StoreException 144 */ 145 private List<CoordinatorActionBean> getCoordActionsFromIds(String jobId, String scope, CoordinatorStore store) 146 throws CommandException, StoreException { 147 ParamChecker.notEmpty(jobId, "jobId"); 148 ParamChecker.notEmpty(scope, "scope"); 149 150 Set<String> actions = new HashSet<String>(); 151 String[] list = scope.split(","); 152 for (String s : list) { 153 s = s.trim(); 154 if (s.contains("-")) { 155 String[] range = s.split("-"); 156 if (range.length != 2) { 157 throw new CommandException(ErrorCode.E0302, "format is wrong for action's range '" + s + "'"); 158 } 159 int start; 160 int end; 161 try { 162 start = Integer.parseInt(range[0].trim()); 163 end = Integer.parseInt(range[1].trim()); 164 if (start > end) { 165 throw new CommandException(ErrorCode.E0302, "format is wrong for action's range '" + s + "'"); 166 } 167 } 168 catch (NumberFormatException ne) { 169 throw new CommandException(ErrorCode.E0302, ne); 170 } 171 for (int i = start; i <= end; i++) { 172 actions.add(jobId + "@" + i); 173 } 174 } 175 else { 176 try { 177 Integer.parseInt(s); 178 } 179 catch (NumberFormatException ne) { 180 throw new CommandException(ErrorCode.E0302, "format is wrong for action id'" + s 181 + "'. Integer only."); 182 } 183 actions.add(jobId + "@" + s); 184 } 185 } 186 187 List<CoordinatorActionBean> coordActions = new ArrayList<CoordinatorActionBean>(); 188 for (String id : actions) { 189 CoordinatorActionBean coordAction = store.getCoordinatorAction(id, false); 190 coordActions.add(coordAction); 191 log.debug("Rerun coordinator for actionId='" + id + "'"); 192 } 193 return coordActions; 194 } 195 196 /** 197 * Get the list of actions for given date ranges 198 * 199 * @param jobId 200 * @param scope 201 * @param store 202 * @return the list of dates to rerun 203 * @throws CommandException 204 * @throws StoreException 205 */ 206 private List<CoordinatorActionBean> getCoordActionsFromDates(String jobId, String scope, CoordinatorStore store) 207 throws CommandException, StoreException { 208 ParamChecker.notEmpty(jobId, "jobId"); 209 ParamChecker.notEmpty(scope, "scope"); 210 211 Set<CoordinatorActionBean> actionSet = new HashSet<CoordinatorActionBean>(); 212 String[] list = scope.split(","); 213 for (String s : list) { 214 s = s.trim(); 215 if (s.contains("::")) { 216 String[] dateRange = s.split("::"); 217 if (dateRange.length != 2) { 218 throw new CommandException(ErrorCode.E0302, "format is wrong for date's range '" + s + "'"); 219 } 220 Date start; 221 Date end; 222 try { 223 start = DateUtils.parseDateUTC(dateRange[0].trim()); 224 end = DateUtils.parseDateUTC(dateRange[1].trim()); 225 if (start.after(end)) { 226 throw new CommandException(ErrorCode.E0302, "start date is older than end date: '" + s + "'"); 227 } 228 } 229 catch (Exception e) { 230 throw new CommandException(ErrorCode.E0302, e); 231 } 232 233 List<CoordinatorActionBean> listOfActions = getActionIdsFromDateRange(jobId, start, end, store); 234 actionSet.addAll(listOfActions); 235 } 236 else { 237 Date date; 238 try { 239 date = DateUtils.parseDateUTC(s.trim()); 240 } 241 catch (Exception e) { 242 throw new CommandException(ErrorCode.E0302, e); 243 } 244 245 CoordinatorActionBean coordAction = store.getCoordActionForNominalTime(jobId, date); 246 actionSet.add(coordAction); 247 } 248 } 249 250 List<CoordinatorActionBean> coordActions = new ArrayList<CoordinatorActionBean>(); 251 for (CoordinatorActionBean coordAction : actionSet) { 252 coordActions.add(coordAction); 253 log.debug("Rerun coordinator for actionId='" + coordAction.getId() + "'"); 254 } 255 return coordActions; 256 } 257 258 private List<CoordinatorActionBean> getActionIdsFromDateRange(String jobId, Date start, Date end, 259 CoordinatorStore store) 260 throws StoreException { 261 List<CoordinatorActionBean> list = store.getCoordActionsForDates(jobId, start, end); 262 return list; 263 } 264 265 /** 266 * Check if all given actions are eligible to rerun. 267 * 268 * @param actions list of CoordinatorActionBean 269 * @return true if all actions are eligible to rerun 270 */ 271 private boolean checkAllActionsRunnable(List<CoordinatorActionBean> coordActions) { 272 for (CoordinatorActionBean coordAction : coordActions) { 273 if (!coordAction.isTerminalStatus()) { 274 return false; 275 } 276 } 277 return true; 278 } 279 280 /** 281 * Cleanup output-events directories 282 * 283 * @param eAction 284 * @param workflow 285 * @param action 286 */ 287 @SuppressWarnings("unchecked") 288 private void cleanupOutputEvents(Element eAction, String user, String group, Configuration conf) { 289 Element outputList = eAction.getChild("output-events", eAction.getNamespace()); 290 if (outputList != null) { 291 for (Element data : (List<Element>) outputList.getChildren("data-out", eAction.getNamespace())) { 292 if (data.getChild("uris", data.getNamespace()) != null) { 293 String uris = data.getChild("uris", data.getNamespace()).getTextTrim(); 294 if (uris != null) { 295 String[] uriArr = uris.split(CoordELFunctions.INSTANCE_SEPARATOR); 296 for (String uri : uriArr) { 297 Path path = new Path(uri); 298 try { 299 FileSystem fs = Services.get().get(HadoopAccessorService.class). 300 createFileSystem(user, group, path.toUri(), conf); 301 if (fs.exists(path)) { 302 if (!fs.delete(path, true)) { 303 throw new IOException(); 304 } 305 } 306 log.debug("Cleanup the output dir " + path); 307 } 308 catch (Exception ex) { 309 log.warn("Failed to cleanup the output dir " + uri, ex); 310 } 311 } 312 } 313 314 } 315 } 316 } 317 else { 318 log.info("No output-events defined in coordinator xml. Therefore nothing to cleanup"); 319 } 320 } 321 322 /** 323 * Refresh an Action 324 * 325 * @param coordJob 326 * @param coordAction 327 * @param store 328 * @throws Exception 329 */ 330 private void refreshAction(CoordinatorJobBean coordJob, CoordinatorActionBean coordAction, CoordinatorStore store) 331 throws Exception { 332 Configuration jobConf = null; 333 try { 334 jobConf = new XConfiguration(new StringReader(coordJob.getConf())); 335 } 336 catch (IOException ioe) { 337 log.warn("Configuration parse error. read from DB :" + coordJob.getConf(), ioe); 338 throw new CommandException(ErrorCode.E1005, ioe); 339 } 340 String jobXml = coordJob.getJobXml(); 341 Element eJob = XmlUtils.parseXml(jobXml); 342 Date actualTime = new Date(); 343 String actionXml = CoordCommandUtils.materializeOneInstance(jobId, dryrun, (Element) eJob.clone(), coordAction 344 .getNominalTime(), actualTime, coordAction.getActionNumber(), jobConf, coordAction); 345 log.debug("Refresh Action actionId=" + coordAction.getId() + ", actionXml=" 346 + XmlUtils.prettyPrint(actionXml).toString()); 347 coordAction.setActionXml(actionXml); 348 } 349 350 /** 351 * Update an Action into database table 352 * 353 * @param coordJob 354 * @param coordAction 355 * @param actionXml 356 * @param store 357 * @throws Exception 358 */ 359 private void updateAction(CoordinatorJobBean coordJob, CoordinatorActionBean coordAction, String actionXml, 360 CoordinatorStore store) throws Exception { 361 log.debug("updateAction for actionId=" + coordAction.getId()); 362 coordAction.setStatus(CoordinatorAction.Status.WAITING); 363 coordAction.setExternalId(""); 364 coordAction.setExternalStatus(""); 365 coordAction.setRerunTime(new Date()); 366 store.updateCoordinatorAction(coordAction); 367 writeActionRegistration(coordAction.getActionXml(), coordAction, store, coordJob.getUser(), coordJob.getGroup()); 368 } 369 370 /** 371 * Create SLA RegistrationEvent 372 * 373 * @param actionXml 374 * @param actionBean 375 * @param store 376 * @param user 377 * @param group 378 * @throws Exception 379 */ 380 private void writeActionRegistration(String actionXml, CoordinatorActionBean actionBean, CoordinatorStore store, 381 String user, String group) 382 throws Exception { 383 Element eAction = XmlUtils.parseXml(actionXml); 384 Element eSla = eAction.getChild("action", eAction.getNamespace()).getChild("info", eAction.getNamespace("sla")); 385 SLADbOperations.writeSlaRegistrationEvent(eSla, store, actionBean.getId(), SlaAppType.COORDINATOR_ACTION, user, 386 group); 387 } 388 389 @Override 390 protected CoordinatorActionInfo execute(CoordinatorStore store) throws StoreException, CommandException { 391 log.info("STARTED CoordRerunCommand for jobId=" + jobId + ", scope=" + scope); 392 CoordinatorActionInfo coordInfo = null; 393 try { 394 if (lock(jobId)) { 395 coordInfo = call(store); 396 } 397 else { 398 queueCallable(new CoordResumeCommand(jobId), LOCK_FAILURE_REQUEUE_INTERVAL); 399 log.warn("CoordRerunCommand lock was not acquired - " + " failed " + jobId + ". Requeing the same."); 400 } 401 } 402 catch (InterruptedException e) { 403 queueCallable(new CoordResumeCommand(jobId), LOCK_FAILURE_REQUEUE_INTERVAL); 404 log.warn("CoordRerunCommand lock acquiring failed " + " with exception " + e.getMessage() + " for job id " 405 + jobId + ". Requeing the same."); 406 } 407 finally { 408 log.info("ENDED CoordRerunCommand for jobId=" + jobId + ", scope=" + scope); 409 } 410 return coordInfo; 411 } 412 413 }