001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie; 020 021import java.io.IOException; 022import java.io.Writer; 023import java.text.ParseException; 024import java.util.ArrayList; 025import java.util.Date; 026import java.util.HashMap; 027import java.util.List; 028import java.util.Map; 029import java.util.StringTokenizer; 030 031import org.apache.hadoop.conf.Configuration; 032import org.apache.oozie.BaseEngine.LOG_TYPE; 033import org.apache.oozie.client.CoordinatorAction; 034import org.apache.oozie.client.CoordinatorJob; 035import org.apache.oozie.client.WorkflowJob; 036import org.apache.oozie.client.rest.BulkResponseImpl; 037import org.apache.oozie.command.BulkJobsXCommand; 038import org.apache.oozie.command.CommandException; 039import org.apache.oozie.command.bundle.BundleSLAAlertsDisableXCommand; 040import org.apache.oozie.command.bundle.BundleSLAAlertsEnableXCommand; 041import org.apache.oozie.command.bundle.BundleSLAChangeXCommand; 042import org.apache.oozie.command.bundle.BulkBundleXCommand; 043import org.apache.oozie.command.bundle.BundleJobChangeXCommand; 044import org.apache.oozie.command.bundle.BundleJobResumeXCommand; 045import org.apache.oozie.command.bundle.BundleJobSuspendXCommand; 046import org.apache.oozie.command.bundle.BundleJobXCommand; 047import org.apache.oozie.command.bundle.BundleJobsXCommand; 048import org.apache.oozie.command.bundle.BundleKillXCommand; 049import org.apache.oozie.command.bundle.BundleRerunXCommand; 050import org.apache.oozie.command.bundle.BundleStartXCommand; 051import org.apache.oozie.command.bundle.BundleSubmitXCommand; 052import org.apache.oozie.command.OperationType; 053import org.apache.oozie.executor.jpa.BundleJobQueryExecutor; 054import org.apache.oozie.executor.jpa.JPAExecutorException; 055import org.apache.oozie.service.DagXLogInfoService; 056import org.apache.oozie.util.DateUtils; 057import org.apache.oozie.util.JobsFilterUtils; 058import org.apache.oozie.util.JobUtils; 059import org.apache.oozie.util.XLogAuditFilter; 060import org.apache.oozie.util.XLogFilter; 061import org.apache.oozie.util.XLogUserFilterParam; 062import org.apache.oozie.util.ParamChecker; 063import org.apache.oozie.util.XLog; 064 065import com.google.common.annotations.VisibleForTesting; 066 067import javax.servlet.ServletException; 068 069public class BundleEngine extends BaseEngine { 070 /** 071 * Create a system Bundle engine, with no user and no group. 072 */ 073 public BundleEngine() { 074 } 075 076 /** 077 * Create a Bundle engine to perform operations on behave of a user. 078 * 079 * @param user user name. 080 */ 081 public BundleEngine(String user) { 082 this.user = ParamChecker.notEmpty(user, "user"); 083 } 084 085 /* (non-Javadoc) 086 * @see org.apache.oozie.BaseEngine#change(java.lang.String, java.lang.String) 087 */ 088 @Override 089 public void change(String jobId, String changeValue) throws BundleEngineException { 090 try { 091 BundleJobChangeXCommand change = new BundleJobChangeXCommand(jobId, changeValue); 092 change.call(); 093 } 094 catch (CommandException ex) { 095 throw new BundleEngineException(ex); 096 } 097 } 098 099 /* (non-Javadoc) 100 * @see org.apache.oozie.BaseEngine#dryRunSubmit(org.apache.hadoop.conf.Configuration) 101 */ 102 @Override 103 public String dryRunSubmit(Configuration conf) throws BundleEngineException { 104 BundleSubmitXCommand submit = new BundleSubmitXCommand(true, conf); 105 try { 106 String jobId = submit.call(); 107 return jobId; 108 } 109 catch (CommandException ex) { 110 throw new BundleEngineException(ex); 111 } 112 } 113 114 /* (non-Javadoc) 115 * @see org.apache.oozie.BaseEngine#getCoordJob(java.lang.String) 116 */ 117 @Override 118 public CoordinatorJob getCoordJob(String jobId) throws BundleEngineException { 119 throw new BundleEngineException(new XException(ErrorCode.E0301, "cannot get a coordinator job from BundleEngine")); 120 } 121 122 public BundleJobBean getBundleJob(String jobId) throws BundleEngineException { 123 try { 124 return new BundleJobXCommand(jobId).call(); 125 } 126 catch (CommandException ex) { 127 throw new BundleEngineException(ex); 128 } 129 } 130 131 /* (non-Javadoc) 132 * @see org.apache.oozie.BaseEngine#getCoordJob(java.lang.String, int, int) 133 */ 134 @Override 135 public CoordinatorJob getCoordJob(String jobId, String filter, int start, int length, boolean desc) 136 throws BundleEngineException { 137 throw new BundleEngineException(new XException(ErrorCode.E0301, 138 "cannot get a coordinator job from BundleEngine")); 139 } 140 141 /* (non-Javadoc) 142 * @see org.apache.oozie.BaseEngine#getDefinition(java.lang.String) 143 */ 144 @Override 145 public String getDefinition(String jobId) throws BundleEngineException { 146 BundleJobBean job; 147 try { 148 job = new BundleJobXCommand(jobId).call(); 149 } 150 catch (CommandException ex) { 151 throw new BundleEngineException(ex); 152 } 153 return job.getOrigJobXml(); 154 } 155 156 /* (non-Javadoc) 157 * @see org.apache.oozie.BaseEngine#getJob(java.lang.String) 158 */ 159 @Override 160 public WorkflowJob getJob(String jobId) throws BundleEngineException { 161 throw new BundleEngineException(new XException(ErrorCode.E0301, "cannot get a workflow job from BundleEngine")); 162 } 163 164 /* (non-Javadoc) 165 * @see org.apache.oozie.BaseEngine#getJob(java.lang.String, int, int) 166 */ 167 @Override 168 public WorkflowJob getJob(String jobId, int start, int length) throws BundleEngineException { 169 throw new BundleEngineException(new XException(ErrorCode.E0301, "cannot get a workflow job from BundleEngine")); 170 } 171 172 /* (non-Javadoc) 173 * @see org.apache.oozie.BaseEngine#getJobIdForExternalId(java.lang.String) 174 */ 175 @Override 176 public String getJobIdForExternalId(String externalId) throws BundleEngineException { 177 return null; 178 } 179 180 /* (non-Javadoc) 181 * @see org.apache.oozie.BaseEngine#kill(java.lang.String) 182 */ 183 @Override 184 public void kill(String jobId) throws BundleEngineException { 185 try { 186 new BundleKillXCommand(jobId).call(); 187 } 188 catch (CommandException e) { 189 throw new BundleEngineException(e); 190 } 191 } 192 193 /* (non-Javadoc) 194 * @see org.apache.oozie.BaseEngine#reRun(java.lang.String, org.apache.hadoop.conf.Configuration) 195 */ 196 @Override 197 @Deprecated 198 public void reRun(String jobId, Configuration conf) throws BundleEngineException { 199 throw new BundleEngineException(new XException(ErrorCode.E0301, "rerun")); 200 } 201 202 /** 203 * Rerun Bundle actions for given rerunType 204 * 205 * @param jobId bundle job id 206 * @param coordScope the rerun scope for coordinator job names separated by "," 207 * @param dateScope the rerun scope for coordinator nominal times separated by "," 208 * @param refresh true if user wants to refresh input/outpur dataset urls 209 * @param noCleanup false if user wants to cleanup output events for given rerun actions 210 * @throws BaseEngineException thrown if failed to rerun 211 */ 212 public void reRun(String jobId, String coordScope, String dateScope, boolean refresh, boolean noCleanup) 213 throws BaseEngineException { 214 try { 215 new BundleRerunXCommand(jobId, coordScope, dateScope, refresh, noCleanup).call(); 216 } 217 catch (CommandException ex) { 218 throw new BaseEngineException(ex); 219 } 220 } 221 222 /* (non-Javadoc) 223 * @see org.apache.oozie.BaseEngine#resume(java.lang.String) 224 */ 225 @Override 226 public void resume(String jobId) throws BundleEngineException { 227 BundleJobResumeXCommand resume = new BundleJobResumeXCommand(jobId); 228 try { 229 resume.call(); 230 } 231 catch (CommandException ex) { 232 throw new BundleEngineException(ex); 233 } 234 } 235 236 /* (non-Javadoc) 237 * @see org.apache.oozie.BaseEngine#start(java.lang.String) 238 */ 239 @Override 240 public void start(String jobId) throws BundleEngineException { 241 try { 242 new BundleStartXCommand(jobId).call(); 243 } 244 catch (CommandException e) { 245 throw new BundleEngineException(e); 246 } 247 } 248 249 @Override 250 public void streamLog(String jobId, Writer writer, Map<String, String[]> params) throws IOException, 251 BundleEngineException { 252 streamJobLog(jobId, writer, params, LOG_TYPE.LOG); 253 } 254 255 @Override 256 public void streamErrorLog(String jobId, Writer writer, Map<String, String[]> params) throws IOException, 257 BundleEngineException { 258 streamJobLog(jobId, writer, params, LOG_TYPE.ERROR_LOG); 259 } 260 261 @Override 262 public void streamAuditLog(String jobId, Writer writer, Map<String, String[]> params) throws IOException, 263 BundleEngineException { 264 try { 265 streamJobLog(new XLogAuditFilter(new XLogUserFilterParam(params)), jobId, writer, params, LOG_TYPE.AUDIT_LOG); 266 } 267 catch (CommandException e) { 268 throw new IOException(e); 269 } 270 } 271 272 private void streamJobLog(String jobId, Writer writer, Map<String, String[]> params, LOG_TYPE logType) 273 throws IOException, BundleEngineException { 274 try { 275 streamJobLog(new XLogFilter(new XLogUserFilterParam(params)), jobId, writer, params, logType); 276 } 277 catch (Exception e) { 278 throw new IOException(e); 279 } 280 } 281 282 private void streamJobLog(XLogFilter filter, String jobId, Writer writer, Map<String, String[]> params, LOG_TYPE logType) 283 throws IOException, BundleEngineException { 284 try { 285 BundleJobBean job; 286 filter.setParameter(DagXLogInfoService.JOB, jobId); 287 job = new BundleJobXCommand(jobId).call(); 288 Date lastTime = null; 289 if (job.isTerminalStatus()) { 290 lastTime = job.getLastModifiedTime(); 291 } 292 if (lastTime == null) { 293 lastTime = new Date(); 294 } 295 fetchLog(filter, job.getCreatedTime(), lastTime, writer, params, logType); 296 } 297 catch (Exception ex) { 298 throw new IOException(ex); 299 } 300 } 301 302 /* (non-Javadoc) 303 * @see org.apache.oozie.BaseEngine#submitJob(org.apache.hadoop.conf.Configuration, boolean) 304 */ 305 @Override 306 public String submitJob(Configuration conf, boolean startJob) throws BundleEngineException { 307 try { 308 String jobId = new BundleSubmitXCommand(conf).call(); 309 310 if (startJob) { 311 start(jobId); 312 } 313 return jobId; 314 } 315 catch (CommandException ex) { 316 throw new BundleEngineException(ex); 317 } 318 } 319 320 /* (non-Javadoc) 321 * @see org.apache.oozie.BaseEngine#suspend(java.lang.String) 322 */ 323 @Override 324 public void suspend(String jobId) throws BundleEngineException { 325 BundleJobSuspendXCommand suspend = new BundleJobSuspendXCommand(jobId); 326 try { 327 suspend.call(); 328 } 329 catch (CommandException ex) { 330 throw new BundleEngineException(ex); 331 } 332 } 333 334 /** 335 * Get bundle jobs 336 * 337 * @param filter the filter string 338 * @param start start location for paging 339 * @param len total length to get 340 * @return bundle job info 341 * @throws BundleEngineException thrown if failed to get bundle job info 342 */ 343 public BundleJobInfo getBundleJobs(String filter, int start, int len) throws BundleEngineException { 344 Map<String, List<String>> filterList = parseFilter(filter); 345 346 try { 347 return new BundleJobsXCommand(filterList, start, len).call(); 348 } 349 catch (CommandException ex) { 350 throw new BundleEngineException(ex); 351 } 352 } 353 354 /** 355 * Parse filter string to a map with key = filter name and values = filter values 356 * 357 * @param filter the filter string 358 * @return filter key and value map 359 * @throws CoordinatorEngineException thrown if failed to parse filter string 360 */ 361 @VisibleForTesting 362 Map<String, List<String>> parseFilter(String filter) throws BundleEngineException { 363 try { 364 return JobsFilterUtils.parseFilter(filter); 365 } 366 catch (ServletException ex) { 367 throw new BundleEngineException(ErrorCode.E0420, filter, ex.getMessage()); 368 } 369 } 370 371 /** 372 * Get bulk job response 373 * 374 * @param bulkFilter the filter string 375 * @param start start location for paging 376 * @param len total length to get 377 * @return bulk job info 378 * @throws BundleEngineException thrown if failed to get bulk job info 379 */ 380 public BulkResponseInfo getBulkJobs(String bulkFilter, int start, int len) throws BundleEngineException { 381 Map<String,List<String>> bulkRequestMap = parseBulkFilter(bulkFilter); 382 try { 383 return new BulkJobsXCommand(bulkRequestMap, start, len).call(); 384 } 385 catch (CommandException ex) { 386 throw new BundleEngineException(ex); 387 } 388 } 389 390 /** 391 * Parse filter string to a map with key = filter name and values = filter values 392 * Allowed keys are defined as constants on top 393 * 394 * @param bulkParams the filter string 395 * @return filter key-value pair map 396 * @throws BundleEngineException thrown if failed to parse filter string 397 */ 398 public static Map<String,List<String>> parseBulkFilter(String bulkParams) throws BundleEngineException { 399 400 Map<String,List<String>> bulkFilter = new HashMap<String,List<String>>(); 401 // Functionality can be extended to different job levels - TODO extend filter parser and query 402 // E.g. String filterlevel = "coordinatoraction"; BulkResponseImpl.BULK_FILTER_LEVEL 403 if (bulkFilter != null) { 404 StringTokenizer st = new StringTokenizer(bulkParams, ";"); 405 while (st.hasMoreTokens()) { 406 String token = st.nextToken(); 407 if (token.contains("=")) { 408 String[] pair = token.split("="); 409 if (pair.length != 2) { 410 throw new BundleEngineException(ErrorCode.E0420, token, 411 "elements must be name=value pairs"); 412 } 413 pair[0] = pair[0].toLowerCase(); 414 String[] values = pair[1].split(","); 415 if (!BulkResponseImpl.BULK_FILTER_NAMES.contains(pair[0])) { 416 throw new BundleEngineException(ErrorCode.E0420, token, XLog.format("invalid parameter name [{0}]", 417 pair[0])); 418 } 419 // special check and processing for time related params 420 if (pair[0].contains("time")) { 421 try { 422 DateUtils.parseDateUTC(pair[1]); 423 } 424 catch (ParseException e) { 425 throw new BundleEngineException(ErrorCode.E0420, token, XLog.format( 426 "invalid value [{0}] for time. A datetime value of pattern [{1}] is expected", pair[1], 427 DateUtils.ISO8601_UTC_MASK)); 428 } 429 } 430 // special check for action status param 431 // TODO: when extended for levels other than coord action, check against corresponding level's Status values 432 if (pair[0].equals(BulkResponseImpl.BULK_FILTER_STATUS)) { 433 for(String value : values) { 434 try { 435 CoordinatorAction.Status.valueOf(value); 436 } 437 catch (IllegalArgumentException ex) { 438 throw new BundleEngineException(ErrorCode.E0420, token, XLog.format( 439 "invalid action status [{0}]", value)); 440 } 441 } 442 } 443 // eventually adding into map for all cases e.g. names, times, status 444 List<String> list = bulkFilter.get(pair[0]); 445 if (list == null) { 446 list = new ArrayList<String>(); 447 bulkFilter.put(pair[0], list); 448 } 449 for(String value : values) { 450 value = value.trim(); 451 if(value.isEmpty()) { 452 throw new BundleEngineException(ErrorCode.E0420, token, "value is empty or whitespace"); 453 } 454 list.add(value); 455 } 456 } else { 457 throw new BundleEngineException(ErrorCode.E0420, token, "elements must be name=value pairs"); 458 } 459 } 460 if (!bulkFilter.containsKey(BulkResponseImpl.BULK_FILTER_BUNDLE)) { 461 throw new BundleEngineException(ErrorCode.E0305, BulkResponseImpl.BULK_FILTER_BUNDLE); 462 } 463 } 464 return bulkFilter; 465 } 466 467 /** 468 * Return the status for a Job ID 469 * 470 * @param jobId job Id. 471 * @return the job's status 472 * @throws BundleEngineException thrown if the job's status could not be obtained 473 */ 474 @Override 475 public String getJobStatus(String jobId) throws BundleEngineException { 476 try { 477 BundleJobBean bundleJob = BundleJobQueryExecutor.getInstance().get( 478 BundleJobQueryExecutor.BundleJobQuery.GET_BUNDLE_JOB_STATUS, jobId); 479 return bundleJob.getStatusStr(); 480 } 481 catch (JPAExecutorException e) { 482 throw new BundleEngineException(e); 483 } 484 } 485 486 @Override 487 public void enableSLAAlert(String id, String actions, String dates, String childIds) throws BaseEngineException { 488 try { 489 new BundleSLAAlertsEnableXCommand(id, actions, dates, childIds).call(); 490 } 491 catch (CommandException e) { 492 throw new BundleEngineException(e); 493 } 494 } 495 496 @Override 497 public void disableSLAAlert(String id, String actions, String dates, String childIds) throws BaseEngineException { 498 try { 499 new BundleSLAAlertsDisableXCommand(id, actions, dates, childIds).call(); 500 } 501 catch (CommandException e) { 502 throw new BundleEngineException(e); 503 } 504 } 505 506 @Override 507 public void changeSLA(String id, String actions, String dates, String childIds, String newParams) 508 throws BaseEngineException { 509 Map<String, String> slaNewParams = null; 510 try { 511 512 if (newParams != null) { 513 slaNewParams = JobUtils.parseChangeValue(newParams); 514 } 515 new BundleSLAChangeXCommand(id, actions, dates, childIds, slaNewParams).call(); 516 } 517 catch (CommandException e) { 518 throw new BundleEngineException(e); 519 } 520 } 521 522 /** 523 * return a list of killed Bundle job 524 * 525 * @param filter, the filter string for which the bundle jobs are killed 526 * @param start, the starting index for bundle jobs 527 * @param len, maximum number of jobs to be killed 528 * @return the list of jobs being killed 529 * @throws BundleEngineException thrown if one or more of the jobs cannot be killed 530 */ 531 public BundleJobInfo killJobs(String filter, int start, int len) throws BundleEngineException { 532 try { 533 Map<String, List<String>> filterList = parseFilter(filter); 534 BundleJobInfo bundleJobInfo = new BulkBundleXCommand(filterList, start, len, OperationType.Kill).call(); 535 if (bundleJobInfo == null) { 536 return new BundleJobInfo(new ArrayList<BundleJobBean>(), 0, 0, 0); 537 } 538 return bundleJobInfo; 539 } 540 catch (CommandException ex) { 541 throw new BundleEngineException(ex); 542 } 543 } 544 545 /** 546 * return a list of suspended Bundle job 547 * 548 * @param filter, the filter string for which the bundle jobs are suspended 549 * @param start, the starting index for bundle jobs 550 * @param len, maximum number of jobs to be suspended 551 * @return the list of jobs being suspended 552 * @throws BundleEngineException thrown if one or more of the jobs cannot be suspended 553 */ 554 public BundleJobInfo suspendJobs(String filter, int start, int len) throws BundleEngineException { 555 try { 556 Map<String, List<String>> filterList = parseFilter(filter); 557 BundleJobInfo bundleJobInfo = new BulkBundleXCommand(filterList, start, len, OperationType.Suspend).call(); 558 if (bundleJobInfo == null) { 559 return new BundleJobInfo(new ArrayList<BundleJobBean>(), 0, 0, 0); 560 } 561 return bundleJobInfo; 562 } 563 catch (CommandException ex) { 564 throw new BundleEngineException(ex); 565 } 566 } 567 568 /** 569 * return a list of resumed Bundle job 570 * 571 * @param filter, the filter string for which the bundle jobs are resumed 572 * @param start, the starting index for bundle jobs 573 * @param len, maximum number of jobs to be resumed 574 * @return the list of jobs being resumed 575 * @throws BundleEngineException thrown if one or more of the jobs cannot be resumed 576 */ 577 public BundleJobInfo resumeJobs(String filter, int start, int len) throws BundleEngineException { 578 try { 579 Map<String, List<String>> filterList = parseFilter(filter); 580 BundleJobInfo bundleJobInfo = new BulkBundleXCommand(filterList, start, len, OperationType.Resume).call(); 581 if (bundleJobInfo == null) { 582 return new BundleJobInfo(new ArrayList<BundleJobBean>(), 0, 0, 0); 583 } 584 return bundleJobInfo; 585 } 586 catch (CommandException ex) { 587 throw new BundleEngineException(ex); 588 } 589 } 590}