001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie; 020 021import java.io.IOException; 022import java.io.Writer; 023import java.text.ParseException; 024import java.util.ArrayList; 025import java.util.Date; 026import java.util.HashMap; 027import java.util.List; 028import java.util.Map; 029import java.util.StringTokenizer; 030 031import javax.servlet.ServletException; 032 033import org.apache.hadoop.conf.Configuration; 034import org.apache.oozie.client.CoordinatorAction; 035import org.apache.oozie.client.CoordinatorJob; 036import org.apache.oozie.client.WorkflowJob; 037import org.apache.oozie.client.rest.BulkResponseImpl; 038import org.apache.oozie.command.BulkJobsXCommand; 039import org.apache.oozie.command.CommandException; 040import org.apache.oozie.command.OperationType; 041import org.apache.oozie.command.bundle.BulkBundleXCommand; 042import org.apache.oozie.command.bundle.BundleJobChangeXCommand; 043import org.apache.oozie.command.bundle.BundleJobResumeXCommand; 044import org.apache.oozie.command.bundle.BundleJobSuspendXCommand; 045import org.apache.oozie.command.bundle.BundleJobXCommand; 046import org.apache.oozie.command.bundle.BundleJobsXCommand; 047import org.apache.oozie.command.bundle.BundleKillXCommand; 048import org.apache.oozie.command.bundle.BundleRerunXCommand; 049import org.apache.oozie.command.bundle.BundleSLAAlertsDisableXCommand; 050import org.apache.oozie.command.bundle.BundleSLAAlertsEnableXCommand; 051import org.apache.oozie.command.bundle.BundleSLAChangeXCommand; 052import org.apache.oozie.command.bundle.BundleStartXCommand; 053import org.apache.oozie.command.bundle.BundleSubmitXCommand; 054import org.apache.oozie.executor.jpa.BundleJobQueryExecutor; 055import org.apache.oozie.executor.jpa.JPAExecutorException; 056import org.apache.oozie.service.DagXLogInfoService; 057import org.apache.oozie.util.DateUtils; 058import org.apache.oozie.util.JobUtils; 059import org.apache.oozie.util.JobsFilterUtils; 060import org.apache.oozie.util.ParamChecker; 061import org.apache.oozie.util.XLog; 062import org.apache.oozie.util.XLogAuditFilter; 063import org.apache.oozie.util.XLogFilter; 064import org.apache.oozie.util.XLogUserFilterParam; 065 066import com.google.common.annotations.VisibleForTesting; 067 068public class BundleEngine extends BaseEngine { 069 /** 070 * Create a system Bundle engine, with no user and no group. 071 */ 072 public BundleEngine() { 073 } 074 075 /** 076 * Create a Bundle engine to perform operations on behave of a user. 077 * 078 * @param user user name. 079 */ 080 public BundleEngine(String user) { 081 this.user = ParamChecker.notEmpty(user, "user"); 082 } 083 084 /* (non-Javadoc) 085 * @see org.apache.oozie.BaseEngine#change(java.lang.String, java.lang.String) 086 */ 087 @Override 088 public void change(String jobId, String changeValue) throws BundleEngineException { 089 try { 090 BundleJobChangeXCommand change = new BundleJobChangeXCommand(jobId, changeValue); 091 change.call(); 092 } 093 catch (CommandException ex) { 094 throw new BundleEngineException(ex); 095 } 096 } 097 098 /* (non-Javadoc) 099 * @see org.apache.oozie.BaseEngine#dryRunSubmit(org.apache.hadoop.conf.Configuration) 100 */ 101 @Override 102 public String dryRunSubmit(Configuration conf) throws BundleEngineException { 103 BundleSubmitXCommand submit = new BundleSubmitXCommand(true, conf); 104 try { 105 String jobId = submit.call(); 106 return jobId; 107 } 108 catch (CommandException ex) { 109 throw new BundleEngineException(ex); 110 } 111 } 112 113 /* (non-Javadoc) 114 * @see org.apache.oozie.BaseEngine#getCoordJob(java.lang.String) 115 */ 116 @Override 117 public CoordinatorJob getCoordJob(String jobId) throws BundleEngineException { 118 throw new BundleEngineException(new XException(ErrorCode.E0301, "cannot get a coordinator job from BundleEngine")); 119 } 120 121 public BundleJobBean getBundleJob(String jobId) throws BundleEngineException { 122 try { 123 return new BundleJobXCommand(jobId).call(); 124 } 125 catch (CommandException ex) { 126 throw new BundleEngineException(ex); 127 } 128 } 129 130 /* (non-Javadoc) 131 * @see org.apache.oozie.BaseEngine#getCoordJob(java.lang.String, int, int) 132 */ 133 @Override 134 public CoordinatorJob getCoordJob(String jobId, String filter, int start, int length, boolean desc) 135 throws BundleEngineException { 136 throw new BundleEngineException(new XException(ErrorCode.E0301, 137 "cannot get a coordinator job from BundleEngine")); 138 } 139 140 /* (non-Javadoc) 141 * @see org.apache.oozie.BaseEngine#getDefinition(java.lang.String) 142 */ 143 @Override 144 public String getDefinition(String jobId) throws BundleEngineException { 145 BundleJobBean job; 146 try { 147 job = new BundleJobXCommand(jobId).call(); 148 } 149 catch (CommandException ex) { 150 throw new BundleEngineException(ex); 151 } 152 return job.getOrigJobXml(); 153 } 154 155 /* (non-Javadoc) 156 * @see org.apache.oozie.BaseEngine#getJob(java.lang.String) 157 */ 158 @Override 159 public WorkflowJob getJob(String jobId) throws BundleEngineException { 160 throw new BundleEngineException(new XException(ErrorCode.E0301, "cannot get a workflow job from BundleEngine")); 161 } 162 163 /* (non-Javadoc) 164 * @see org.apache.oozie.BaseEngine#getJob(java.lang.String, int, int) 165 */ 166 @Override 167 public WorkflowJob getJob(String jobId, int start, int length) throws BundleEngineException { 168 throw new BundleEngineException(new XException(ErrorCode.E0301, "cannot get a workflow job from BundleEngine")); 169 } 170 171 /* (non-Javadoc) 172 * @see org.apache.oozie.BaseEngine#getJobIdForExternalId(java.lang.String) 173 */ 174 @Override 175 public String getJobIdForExternalId(String externalId) throws BundleEngineException { 176 return null; 177 } 178 179 /* (non-Javadoc) 180 * @see org.apache.oozie.BaseEngine#kill(java.lang.String) 181 */ 182 @Override 183 public void kill(String jobId) throws BundleEngineException { 184 try { 185 new BundleKillXCommand(jobId).call(); 186 } 187 catch (CommandException e) { 188 throw new BundleEngineException(e); 189 } 190 } 191 192 /* (non-Javadoc) 193 * @see org.apache.oozie.BaseEngine#reRun(java.lang.String, org.apache.hadoop.conf.Configuration) 194 */ 195 @Override 196 @Deprecated 197 public void reRun(String jobId, Configuration conf) throws BundleEngineException { 198 throw new BundleEngineException(new XException(ErrorCode.E0301, "rerun")); 199 } 200 201 /** 202 * Rerun Bundle actions for given rerunType 203 * 204 * @param jobId bundle job id 205 * @param coordScope the rerun scope for coordinator job names separated by "," 206 * @param dateScope the rerun scope for coordinator nominal times separated by "," 207 * @param refresh true if user wants to refresh input/outpur dataset urls 208 * @param noCleanup false if user wants to cleanup output events for given rerun actions 209 * @throws BaseEngineException thrown if failed to rerun 210 */ 211 public void reRun(String jobId, String coordScope, String dateScope, boolean refresh, boolean noCleanup) 212 throws BaseEngineException { 213 try { 214 new BundleRerunXCommand(jobId, coordScope, dateScope, refresh, noCleanup).call(); 215 } 216 catch (CommandException ex) { 217 throw new BaseEngineException(ex); 218 } 219 } 220 221 /* (non-Javadoc) 222 * @see org.apache.oozie.BaseEngine#resume(java.lang.String) 223 */ 224 @Override 225 public void resume(String jobId) throws BundleEngineException { 226 BundleJobResumeXCommand resume = new BundleJobResumeXCommand(jobId); 227 try { 228 resume.call(); 229 } 230 catch (CommandException ex) { 231 throw new BundleEngineException(ex); 232 } 233 } 234 235 /* (non-Javadoc) 236 * @see org.apache.oozie.BaseEngine#start(java.lang.String) 237 */ 238 @Override 239 public void start(String jobId) throws BundleEngineException { 240 try { 241 new BundleStartXCommand(jobId).call(); 242 } 243 catch (CommandException e) { 244 throw new BundleEngineException(e); 245 } 246 } 247 248 @Override 249 public void streamLog(String jobId, Writer writer, Map<String, String[]> params) throws IOException, 250 BundleEngineException { 251 streamJobLog(jobId, writer, params, LOG_TYPE.LOG); 252 } 253 254 @Override 255 public void streamErrorLog(String jobId, Writer writer, Map<String, String[]> params) throws IOException, 256 BundleEngineException { 257 streamJobLog(jobId, writer, params, LOG_TYPE.ERROR_LOG); 258 } 259 260 @Override 261 public void streamAuditLog(String jobId, Writer writer, Map<String, String[]> params) throws IOException, 262 BundleEngineException { 263 try { 264 streamJobLog(new XLogAuditFilter(new XLogUserFilterParam(params)), jobId, writer, params, LOG_TYPE.AUDIT_LOG); 265 } 266 catch (CommandException e) { 267 throw new IOException(e); 268 } 269 } 270 271 private void streamJobLog(String jobId, Writer writer, Map<String, String[]> params, LOG_TYPE logType) 272 throws IOException, BundleEngineException { 273 try { 274 streamJobLog(new XLogFilter(new XLogUserFilterParam(params)), jobId, writer, params, logType); 275 } 276 catch (Exception e) { 277 throw new IOException(e); 278 } 279 } 280 281 private void streamJobLog(XLogFilter filter, String jobId, Writer writer, Map<String, String[]> params, LOG_TYPE logType) 282 throws IOException, BundleEngineException { 283 try { 284 BundleJobBean job; 285 filter.setParameter(DagXLogInfoService.JOB, jobId); 286 job = new BundleJobXCommand(jobId).call(); 287 Date lastTime = null; 288 if (job.isTerminalStatus()) { 289 lastTime = job.getLastModifiedTime(); 290 } 291 if (lastTime == null) { 292 lastTime = new Date(); 293 } 294 fetchLog(filter, job.getCreatedTime(), lastTime, writer, params, logType); 295 } 296 catch (Exception ex) { 297 throw new IOException(ex); 298 } 299 } 300 301 /* (non-Javadoc) 302 * @see org.apache.oozie.BaseEngine#submitJob(org.apache.hadoop.conf.Configuration, boolean) 303 */ 304 @Override 305 public String submitJob(Configuration conf, boolean startJob) throws BundleEngineException { 306 try { 307 String jobId = new BundleSubmitXCommand(conf).call(); 308 309 if (startJob) { 310 start(jobId); 311 } 312 return jobId; 313 } 314 catch (CommandException ex) { 315 throw new BundleEngineException(ex); 316 } 317 } 318 319 /* (non-Javadoc) 320 * @see org.apache.oozie.BaseEngine#suspend(java.lang.String) 321 */ 322 @Override 323 public void suspend(String jobId) throws BundleEngineException { 324 BundleJobSuspendXCommand suspend = new BundleJobSuspendXCommand(jobId); 325 try { 326 suspend.call(); 327 } 328 catch (CommandException ex) { 329 throw new BundleEngineException(ex); 330 } 331 } 332 333 /** 334 * Get bundle jobs 335 * 336 * @param filter the filter string 337 * @param start start location for paging 338 * @param len total length to get 339 * @return bundle job info 340 * @throws BundleEngineException thrown if failed to get bundle job info 341 */ 342 public BundleJobInfo getBundleJobs(String filter, int start, int len) throws BundleEngineException { 343 Map<String, List<String>> filterList = parseFilter(filter); 344 345 try { 346 return new BundleJobsXCommand(filterList, start, len).call(); 347 } 348 catch (CommandException ex) { 349 throw new BundleEngineException(ex); 350 } 351 } 352 353 /** 354 * Parse filter string to a map with key = filter name and values = filter values 355 * 356 * @param filter the filter string 357 * @return filter key and value map 358 * @throws CoordinatorEngineException thrown if failed to parse filter string 359 */ 360 @VisibleForTesting 361 Map<String, List<String>> parseFilter(String filter) throws BundleEngineException { 362 try { 363 return JobsFilterUtils.parseFilter(filter); 364 } 365 catch (ServletException ex) { 366 throw new BundleEngineException(ErrorCode.E0420, filter, ex.getMessage()); 367 } 368 } 369 370 /** 371 * Get bulk job response 372 * 373 * @param bulkFilter the filter string 374 * @param start start location for paging 375 * @param len total length to get 376 * @return bulk job info 377 * @throws BundleEngineException thrown if failed to get bulk job info 378 */ 379 public BulkResponseInfo getBulkJobs(String bulkFilter, int start, int len) throws BundleEngineException { 380 Map<String,List<String>> bulkRequestMap = parseBulkFilter(bulkFilter); 381 try { 382 return new BulkJobsXCommand(bulkRequestMap, start, len).call(); 383 } 384 catch (CommandException ex) { 385 throw new BundleEngineException(ex); 386 } 387 } 388 389 /** 390 * Parse filter string to a map with key = filter name and values = filter values 391 * Allowed keys are defined as constants on top 392 * 393 * @param bulkParams the filter string 394 * @return filter key-value pair map 395 * @throws BundleEngineException thrown if failed to parse filter string 396 */ 397 public static Map<String,List<String>> parseBulkFilter(String bulkParams) throws BundleEngineException { 398 399 Map<String,List<String>> bulkFilter = new HashMap<String,List<String>>(); 400 // Functionality can be extended to different job levels - TODO extend filter parser and query 401 // E.g. String filterlevel = "coordinatoraction"; BulkResponseImpl.BULK_FILTER_LEVEL 402 if (bulkFilter != null) { 403 StringTokenizer st = new StringTokenizer(bulkParams, ";"); 404 while (st.hasMoreTokens()) { 405 String token = st.nextToken(); 406 if (token.contains("=")) { 407 String[] pair = token.split("="); 408 if (pair.length != 2) { 409 throw new BundleEngineException(ErrorCode.E0420, token, 410 "elements must be semicolon-separated name=value pairs"); 411 } 412 pair[0] = pair[0].toLowerCase(); 413 String[] values = pair[1].split(","); 414 if (!BulkResponseImpl.BULK_FILTER_NAMES.contains(pair[0])) { 415 throw new BundleEngineException(ErrorCode.E0420, token, XLog.format("invalid parameter name [{0}]", 416 pair[0])); 417 } 418 // special check and processing for time related params 419 if (pair[0].contains("time")) { 420 try { 421 DateUtils.parseDateUTC(pair[1]); 422 } 423 catch (ParseException e) { 424 throw new BundleEngineException(ErrorCode.E0420, token, XLog.format( 425 "invalid value [{0}] for time. A datetime value of pattern [{1}] is expected", pair[1], 426 DateUtils.ISO8601_UTC_MASK)); 427 } 428 } 429 // special check for action status param 430 // TODO: when extended for levels other than coord action, check against corresponding level's Status values 431 if (pair[0].equals(BulkResponseImpl.BULK_FILTER_STATUS)) { 432 for(String value : values) { 433 try { 434 CoordinatorAction.Status.valueOf(value); 435 } 436 catch (IllegalArgumentException ex) { 437 throw new BundleEngineException(ErrorCode.E0420, token, XLog.format( 438 "invalid action status [{0}]", value)); 439 } 440 } 441 } 442 // eventually adding into map for all cases e.g. names, times, status 443 List<String> list = bulkFilter.get(pair[0]); 444 if (list == null) { 445 list = new ArrayList<String>(); 446 bulkFilter.put(pair[0], list); 447 } 448 for(String value : values) { 449 value = value.trim(); 450 if(value.isEmpty()) { 451 throw new BundleEngineException(ErrorCode.E0420, token, "value is empty or whitespace"); 452 } 453 list.add(value); 454 } 455 } else { 456 throw new BundleEngineException(ErrorCode.E0420, token, 457 "elements must be semicolon-separated name=value pairs"); 458 } 459 } 460 if (!bulkFilter.containsKey(BulkResponseImpl.BULK_FILTER_BUNDLE)) { 461 throw new BundleEngineException(ErrorCode.E0305, BulkResponseImpl.BULK_FILTER_BUNDLE); 462 } 463 } 464 return bulkFilter; 465 } 466 467 /** 468 * Return the status for a Job ID 469 * 470 * @param jobId job Id. 471 * @return the job's status 472 * @throws BundleEngineException thrown if the job's status could not be obtained 473 */ 474 @Override 475 public String getJobStatus(String jobId) throws BundleEngineException { 476 try { 477 BundleJobBean bundleJob = BundleJobQueryExecutor.getInstance().get( 478 BundleJobQueryExecutor.BundleJobQuery.GET_BUNDLE_JOB_STATUS, jobId); 479 return bundleJob.getStatusStr(); 480 } 481 catch (JPAExecutorException e) { 482 throw new BundleEngineException(e); 483 } 484 } 485 486 @Override 487 public void enableSLAAlert(String id, String actions, String dates, String childIds) throws BaseEngineException { 488 try { 489 new BundleSLAAlertsEnableXCommand(id, actions, dates, childIds).call(); 490 } 491 catch (CommandException e) { 492 throw new BundleEngineException(e); 493 } 494 } 495 496 @Override 497 public void disableSLAAlert(String id, String actions, String dates, String childIds) throws BaseEngineException { 498 try { 499 new BundleSLAAlertsDisableXCommand(id, actions, dates, childIds).call(); 500 } 501 catch (CommandException e) { 502 throw new BundleEngineException(e); 503 } 504 } 505 506 @Override 507 public void changeSLA(String id, String actions, String dates, String childIds, String newParams) 508 throws BaseEngineException { 509 Map<String, String> slaNewParams = null; 510 try { 511 512 if (newParams != null) { 513 slaNewParams = JobUtils.parseChangeValue(newParams); 514 } 515 new BundleSLAChangeXCommand(id, actions, dates, childIds, slaNewParams).call(); 516 } 517 catch (CommandException e) { 518 throw new BundleEngineException(e); 519 } 520 } 521 522 /** 523 * return a list of killed Bundle job 524 * 525 * @param filter, the filter string for which the bundle jobs are killed 526 * @param start, the starting index for bundle jobs 527 * @param len, maximum number of jobs to be killed 528 * @return the list of jobs being killed 529 * @throws BundleEngineException thrown if one or more of the jobs cannot be killed 530 */ 531 public BundleJobInfo killJobs(String filter, int start, int len) throws BundleEngineException { 532 try { 533 Map<String, List<String>> filterList = parseFilter(filter); 534 BundleJobInfo bundleJobInfo = new BulkBundleXCommand(filterList, start, len, OperationType.Kill).call(); 535 if (bundleJobInfo == null) { 536 return new BundleJobInfo(new ArrayList<BundleJobBean>(), 0, 0, 0); 537 } 538 return bundleJobInfo; 539 } 540 catch (CommandException ex) { 541 throw new BundleEngineException(ex); 542 } 543 } 544 545 /** 546 * return a list of suspended Bundle job 547 * 548 * @param filter, the filter string for which the bundle jobs are suspended 549 * @param start, the starting index for bundle jobs 550 * @param len, maximum number of jobs to be suspended 551 * @return the list of jobs being suspended 552 * @throws BundleEngineException thrown if one or more of the jobs cannot be suspended 553 */ 554 public BundleJobInfo suspendJobs(String filter, int start, int len) throws BundleEngineException { 555 try { 556 Map<String, List<String>> filterList = parseFilter(filter); 557 BundleJobInfo bundleJobInfo = new BulkBundleXCommand(filterList, start, len, OperationType.Suspend).call(); 558 if (bundleJobInfo == null) { 559 return new BundleJobInfo(new ArrayList<BundleJobBean>(), 0, 0, 0); 560 } 561 return bundleJobInfo; 562 } 563 catch (CommandException ex) { 564 throw new BundleEngineException(ex); 565 } 566 } 567 568 /** 569 * return a list of resumed Bundle job 570 * 571 * @param filter, the filter string for which the bundle jobs are resumed 572 * @param start, the starting index for bundle jobs 573 * @param len, maximum number of jobs to be resumed 574 * @return the list of jobs being resumed 575 * @throws BundleEngineException thrown if one or more of the jobs cannot be resumed 576 */ 577 public BundleJobInfo resumeJobs(String filter, int start, int len) throws BundleEngineException { 578 try { 579 Map<String, List<String>> filterList = parseFilter(filter); 580 BundleJobInfo bundleJobInfo = new BulkBundleXCommand(filterList, start, len, OperationType.Resume).call(); 581 if (bundleJobInfo == null) { 582 return new BundleJobInfo(new ArrayList<BundleJobBean>(), 0, 0, 0); 583 } 584 return bundleJobInfo; 585 } 586 catch (CommandException ex) { 587 throw new BundleEngineException(ex); 588 } 589 } 590}