001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie; 019 020 import org.apache.oozie.util.XLogStreamer; 021 import org.apache.oozie.service.XLogService; 022 import org.apache.oozie.service.DagXLogInfoService; 023 import org.apache.hadoop.conf.Configuration; 024 import org.apache.oozie.client.CoordinatorJob; 025 import org.apache.oozie.client.WorkflowJob; 026 import org.apache.oozie.client.OozieClient; 027 import org.apache.oozie.command.CommandException; 028 import org.apache.oozie.command.wf.CompletedActionXCommand; 029 import org.apache.oozie.command.wf.DefinitionXCommand; 030 import org.apache.oozie.command.wf.ExternalIdXCommand; 031 import org.apache.oozie.command.wf.JobXCommand; 032 import org.apache.oozie.command.wf.JobsXCommand; 033 import org.apache.oozie.command.wf.KillXCommand; 034 import org.apache.oozie.command.wf.ReRunXCommand; 035 import org.apache.oozie.command.wf.ResumeXCommand; 036 import org.apache.oozie.command.wf.StartXCommand; 037 import org.apache.oozie.command.wf.SubmitHiveXCommand; 038 import org.apache.oozie.command.wf.SubmitHttpXCommand; 039 import org.apache.oozie.command.wf.SubmitMRXCommand; 040 import org.apache.oozie.command.wf.SubmitPigXCommand; 041 import org.apache.oozie.command.wf.SubmitXCommand; 042 import org.apache.oozie.command.wf.SuspendXCommand; 043 import org.apache.oozie.command.wf.WorkflowActionInfoXCommand; 044 import org.apache.oozie.service.Services; 045 import org.apache.oozie.service.CallableQueueService; 046 import org.apache.oozie.util.ParamChecker; 047 import org.apache.oozie.util.XCallable; 048 import org.apache.oozie.util.XLog; 049 050 import java.io.Writer; 051 import java.util.Date; 052 import java.util.List; 053 import java.util.Properties; 054 import java.util.Set; 055 import java.util.HashSet; 056 import java.util.StringTokenizer; 057 import java.util.Map; 058 import java.util.HashMap; 059 import java.util.ArrayList; 060 import java.io.IOException; 061 062 /** 063 * The DagEngine provides all the DAG engine functionality for WS calls. 064 */ 065 public class DagEngine extends BaseEngine { 066 067 private static final int HIGH_PRIORITY = 2; 068 private static XLog LOG = XLog.getLog(DagEngine.class); 069 070 /** 071 * Create a system Dag engine, with no user and no group. 072 */ 073 public DagEngine() { 074 if (Services.get().getConf().getBoolean(USE_XCOMMAND, true) == false) { 075 LOG.debug("Oozie DagEngine is not using XCommands."); 076 } 077 else { 078 LOG.debug("Oozie DagEngine is using XCommands."); 079 } 080 } 081 082 /** 083 * Create a Dag engine to perform operations on behave of a user. 084 * 085 * @param user user name. 086 */ 087 public DagEngine(String user) { 088 this(); 089 090 this.user = ParamChecker.notEmpty(user, "user"); 091 } 092 093 /** 094 * Submit a workflow job. <p/> It validates configuration properties. 095 * 096 * @param conf job configuration. 097 * @param startJob indicates if the job should be started or not. 098 * @return the job Id. 099 * @throws DagEngineException thrown if the job could not be created. 100 */ 101 @Override 102 public String submitJob(Configuration conf, boolean startJob) throws DagEngineException { 103 validateSubmitConfiguration(conf); 104 105 try { 106 String jobId; 107 SubmitXCommand submit = new SubmitXCommand(conf); 108 jobId = submit.call(); 109 if (startJob) { 110 start(jobId); 111 } 112 return jobId; 113 } 114 catch (CommandException ex) { 115 throw new DagEngineException(ex); 116 } 117 } 118 119 /** 120 * Submit a workflow through a coordinator. It validates configuration properties. 121 * @param conf job conf 122 * @param parentId parent of workflow 123 * @return 124 * @throws DagEngineException 125 */ 126 public String submitJobFromCoordinator(Configuration conf, String parentId) throws DagEngineException { 127 validateSubmitConfiguration(conf); 128 try { 129 String jobId; 130 SubmitXCommand submit = new SubmitXCommand(conf, parentId); 131 jobId = submit.call(); 132 start(jobId); 133 return jobId; 134 } 135 catch (CommandException ex) { 136 throw new DagEngineException(ex); 137 } 138 } 139 140 /** 141 * Submit a pig/hive/mapreduce job through HTTP. 142 * <p/> 143 * It validates configuration properties. 144 * 145 * @param conf job configuration. 146 * @param jobType job type - can be "pig", "hive, or "mapreduce". 147 * @return the job Id. 148 * @throws DagEngineException thrown if the job could not be created. 149 */ 150 public String submitHttpJob(Configuration conf, String jobType) throws DagEngineException { 151 validateSubmitConfiguration(conf); 152 153 try { 154 String jobId; 155 SubmitHttpXCommand submit = null; 156 if (jobType.equals("pig")) { 157 submit = new SubmitPigXCommand(conf); 158 } 159 else if (jobType.equals("mapreduce")) { 160 submit = new SubmitMRXCommand(conf); 161 } 162 else if (jobType.equals("hive")) { 163 submit = new SubmitHiveXCommand(conf); 164 } 165 166 jobId = submit.call(); 167 start(jobId); 168 return jobId; 169 } 170 catch (CommandException ex) { 171 throw new DagEngineException(ex); 172 } 173 } 174 175 private void validateSubmitConfiguration(Configuration conf) throws DagEngineException { 176 if (conf.get(OozieClient.APP_PATH) == null) { 177 throw new DagEngineException(ErrorCode.E0401, OozieClient.APP_PATH); 178 } 179 } 180 181 /** 182 * Start a job. 183 * 184 * @param jobId job Id. 185 * @throws DagEngineException thrown if the job could not be started. 186 */ 187 @Override 188 public void start(String jobId) throws DagEngineException { 189 // Changing to synchronous call from asynchronous queuing to prevent the 190 // loss of command if the queue is full or the queue is lost in case of 191 // failure. 192 try { 193 new StartXCommand(jobId).call(); 194 } 195 catch (CommandException e) { 196 throw new DagEngineException(e); 197 } 198 } 199 200 /** 201 * Resume a job. 202 * 203 * @param jobId job Id. 204 * @throws DagEngineException thrown if the job could not be resumed. 205 */ 206 @Override 207 public void resume(String jobId) throws DagEngineException { 208 // Changing to synchronous call from asynchronous queuing to prevent the 209 // loss of command if the queue is full or the queue is lost in case of 210 // failure. 211 try { 212 new ResumeXCommand(jobId).call(); 213 } 214 catch (CommandException e) { 215 throw new DagEngineException(e); 216 } 217 } 218 219 /** 220 * Suspend a job. 221 * 222 * @param jobId job Id. 223 * @throws DagEngineException thrown if the job could not be suspended. 224 */ 225 @Override 226 public void suspend(String jobId) throws DagEngineException { 227 // Changing to synchronous call from asynchronous queuing to prevent the 228 // loss of command if the queue is full or the queue is lost in case of 229 // failure. 230 try { 231 new SuspendXCommand(jobId).call(); 232 } 233 catch (CommandException e) { 234 throw new DagEngineException(e); 235 } 236 } 237 238 /** 239 * Kill a job. 240 * 241 * @param jobId job Id. 242 * @throws DagEngineException thrown if the job could not be killed. 243 */ 244 @Override 245 public void kill(String jobId) throws DagEngineException { 246 // Changing to synchronous call from asynchronous queuing to prevent the 247 // loss of command if the queue is full or the queue is lost in case of 248 // failure. 249 try { 250 new KillXCommand(jobId).call(); 251 LOG.info("User " + user + " killed the WF job " + jobId); 252 } 253 catch (CommandException e) { 254 throw new DagEngineException(e); 255 } 256 } 257 258 /* (non-Javadoc) 259 * @see org.apache.oozie.BaseEngine#change(java.lang.String, java.lang.String) 260 */ 261 @Override 262 public void change(String jobId, String changeValue) throws DagEngineException { 263 // This code should not be reached. 264 throw new DagEngineException(ErrorCode.E1017); 265 } 266 267 /** 268 * Rerun a job. 269 * 270 * @param jobId job Id to rerun. 271 * @param conf configuration information for the rerun. 272 * @throws DagEngineException thrown if the job could not be rerun. 273 */ 274 @Override 275 public void reRun(String jobId, Configuration conf) throws DagEngineException { 276 try { 277 validateReRunConfiguration(conf); 278 new ReRunXCommand(jobId, conf).call(); 279 start(jobId); 280 } 281 catch (CommandException ex) { 282 throw new DagEngineException(ex); 283 } 284 } 285 286 private void validateReRunConfiguration(Configuration conf) throws DagEngineException { 287 if (conf.get(OozieClient.APP_PATH) == null) { 288 throw new DagEngineException(ErrorCode.E0401, OozieClient.APP_PATH); 289 } 290 if (conf.get(OozieClient.RERUN_SKIP_NODES) == null && conf.get(OozieClient.RERUN_FAIL_NODES) == null) { 291 throw new DagEngineException(ErrorCode.E0401, OozieClient.RERUN_SKIP_NODES + " OR " 292 + OozieClient.RERUN_FAIL_NODES); 293 } 294 if (conf.get(OozieClient.RERUN_SKIP_NODES) != null && conf.get(OozieClient.RERUN_FAIL_NODES) != null) { 295 throw new DagEngineException(ErrorCode.E0404, OozieClient.RERUN_SKIP_NODES + " OR " 296 + OozieClient.RERUN_FAIL_NODES); 297 } 298 } 299 300 /** 301 * Process an action callback. 302 * 303 * @param actionId the action Id. 304 * @param externalStatus the action external status. 305 * @param actionData the action output data, <code>null</code> if none. 306 * @throws DagEngineException thrown if the callback could not be processed. 307 */ 308 public void processCallback(String actionId, String externalStatus, Properties actionData) 309 throws DagEngineException { 310 XLog.Info.get().clearParameter(XLogService.GROUP); 311 XLog.Info.get().clearParameter(XLogService.USER); 312 XCallable<Void> command = null; 313 314 command = new CompletedActionXCommand(actionId, externalStatus, 315 actionData, HIGH_PRIORITY); 316 if (!Services.get().get(CallableQueueService.class).queue(command)) { 317 LOG.warn(XLog.OPS, "queue is full or system is in SAFEMODE, ignoring callback"); 318 } 319 } 320 321 /** 322 * Return the info about a job. 323 * 324 * @param jobId job Id. 325 * @return the workflow job info. 326 * @throws DagEngineException thrown if the job info could not be obtained. 327 */ 328 @Override 329 public WorkflowJob getJob(String jobId) throws DagEngineException { 330 try { 331 return new JobXCommand(jobId).call(); 332 } 333 catch (CommandException ex) { 334 throw new DagEngineException(ex); 335 } 336 } 337 338 /** 339 * Return the info about a job with actions subset. 340 * 341 * @param jobId job Id 342 * @param start starting from this index in the list of actions belonging to the job 343 * @param length number of actions to be returned 344 * @return the workflow job info. 345 * @throws DagEngineException thrown if the job info could not be obtained. 346 */ 347 @Override 348 public WorkflowJob getJob(String jobId, int start, int length) throws DagEngineException { 349 try { 350 return new JobXCommand(jobId, start, length).call(); 351 } 352 catch (CommandException ex) { 353 throw new DagEngineException(ex); 354 } 355 } 356 357 /** 358 * Return the a job definition. 359 * 360 * @param jobId job Id. 361 * @return the job definition. 362 * @throws DagEngineException thrown if the job definition could no be obtained. 363 */ 364 @Override 365 public String getDefinition(String jobId) throws DagEngineException { 366 try { 367 return new DefinitionXCommand(jobId).call(); 368 } 369 catch (CommandException ex) { 370 throw new DagEngineException(ex); 371 } 372 } 373 374 /** 375 * Stream the log of a job. 376 * 377 * @param jobId job Id. 378 * @param writer writer to stream the log to. 379 * @throws IOException thrown if the log cannot be streamed. 380 * @throws DagEngineException thrown if there is error in getting the Workflow Information for jobId. 381 */ 382 @Override 383 public void streamLog(String jobId, Writer writer) throws IOException, DagEngineException { 384 XLogStreamer.Filter filter = new XLogStreamer.Filter(); 385 filter.setParameter(DagXLogInfoService.JOB, jobId); 386 WorkflowJob job = getJob(jobId); 387 Date lastTime = job.getEndTime(); 388 if (lastTime == null) { 389 lastTime = job.getLastModifiedTime(); 390 } 391 Services.get().get(XLogService.class).streamLog(filter, job.getCreatedTime(), lastTime, writer); 392 } 393 394 private static final Set<String> FILTER_NAMES = new HashSet<String>(); 395 396 static { 397 FILTER_NAMES.add(OozieClient.FILTER_USER); 398 FILTER_NAMES.add(OozieClient.FILTER_NAME); 399 FILTER_NAMES.add(OozieClient.FILTER_GROUP); 400 FILTER_NAMES.add(OozieClient.FILTER_STATUS); 401 FILTER_NAMES.add(OozieClient.FILTER_ID); 402 } 403 404 /** 405 * Validate a jobs filter. 406 * 407 * @param filter filter to validate. 408 * @return the parsed filter. 409 * @throws DagEngineException thrown if the filter is invalid. 410 */ 411 protected Map<String, List<String>> parseFilter(String filter) throws DagEngineException { 412 Map<String, List<String>> map = new HashMap<String, List<String>>(); 413 if (filter != null) { 414 StringTokenizer st = new StringTokenizer(filter, ";"); 415 while (st.hasMoreTokens()) { 416 String token = st.nextToken(); 417 if (token.contains("=")) { 418 String[] pair = token.split("="); 419 if (pair.length != 2) { 420 throw new DagEngineException(ErrorCode.E0420, filter, "elements must be name=value pairs"); 421 } 422 if (!FILTER_NAMES.contains(pair[0])) { 423 throw new DagEngineException(ErrorCode.E0420, filter, XLog 424 .format("invalid name [{0}]", pair[0])); 425 } 426 if (pair[0].equals("status")) { 427 try { 428 WorkflowJob.Status.valueOf(pair[1]); 429 } 430 catch (IllegalArgumentException ex) { 431 throw new DagEngineException(ErrorCode.E0420, filter, XLog.format("invalid status [{0}]", 432 pair[1])); 433 } 434 } 435 List<String> list = map.get(pair[0]); 436 if (list == null) { 437 list = new ArrayList<String>(); 438 map.put(pair[0], list); 439 } 440 list.add(pair[1]); 441 } 442 else { 443 throw new DagEngineException(ErrorCode.E0420, filter, "elements must be name=value pairs"); 444 } 445 } 446 } 447 return map; 448 } 449 450 /** 451 * Return the info about a set of jobs. 452 * 453 * @param filter job filter. Refer to the {@link org.apache.oozie.client.OozieClient} for the filter syntax. 454 * @param start offset, base 1. 455 * @param len number of jobs to return. 456 * @return job info for all matching jobs, the jobs don't contain node action information. 457 * @throws DagEngineException thrown if the jobs info could not be obtained. 458 */ 459 public WorkflowsInfo getJobs(String filter, int start, int len) throws DagEngineException { 460 Map<String, List<String>> filterList = parseFilter(filter); 461 try { 462 return new JobsXCommand(filterList, start, len).call(); 463 } 464 catch (CommandException dce) { 465 throw new DagEngineException(dce); 466 } 467 } 468 469 /** 470 * Return the workflow Job ID for an external ID. <p/> This is reverse lookup for recovery purposes. 471 * 472 * @param externalId external ID provided at job submission time. 473 * @return the associated workflow job ID if any, <code>null</code> if none. 474 * @throws DagEngineException thrown if the lookup could not be done. 475 */ 476 @Override 477 public String getJobIdForExternalId(String externalId) throws DagEngineException { 478 try { 479 return new ExternalIdXCommand(externalId).call(); 480 } 481 catch (CommandException dce) { 482 throw new DagEngineException(dce); 483 } 484 } 485 486 @Override 487 public CoordinatorJob getCoordJob(String jobId) throws BaseEngineException { 488 throw new BaseEngineException(new XException(ErrorCode.E0301, "cannot get a coordinator job from DagEngine")); 489 } 490 491 @Override 492 public CoordinatorJob getCoordJob(String jobId, String filter, int start, int length, boolean desc) throws BaseEngineException { 493 throw new BaseEngineException(new XException(ErrorCode.E0301, "cannot get a coordinator job from DagEngine")); 494 } 495 496 public WorkflowActionBean getWorkflowAction(String actionId) throws BaseEngineException { 497 try { 498 return new WorkflowActionInfoXCommand(actionId).call(); 499 } 500 catch (CommandException ex) { 501 throw new BaseEngineException(ex); 502 } 503 } 504 505 /* (non-Javadoc) 506 * @see org.apache.oozie.BaseEngine#dryRunSubmit(org.apache.hadoop.conf.Configuration) 507 */ 508 @Override 509 public String dryRunSubmit(Configuration conf) throws BaseEngineException { 510 try { 511 SubmitXCommand submit = new SubmitXCommand(true, conf); 512 return submit.call(); 513 } catch (CommandException ex) { 514 throw new DagEngineException(ex); 515 } 516 } 517 }