001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie; 020 021import java.io.IOException; 022import java.io.Writer; 023import java.text.ParseException; 024import java.util.ArrayList; 025import java.util.Date; 026import java.util.HashMap; 027import java.util.List; 028import java.util.Map; 029import java.util.StringTokenizer; 030 031import javax.servlet.ServletException; 032 033import org.apache.hadoop.conf.Configuration; 034import org.apache.oozie.client.CoordinatorAction; 035import org.apache.oozie.client.CoordinatorJob; 036import org.apache.oozie.client.WorkflowJob; 037import org.apache.oozie.client.rest.BulkResponseImpl; 038import org.apache.oozie.command.BulkJobsXCommand; 039import org.apache.oozie.command.CommandException; 040import org.apache.oozie.command.OperationType; 041import org.apache.oozie.command.bundle.BulkBundleXCommand; 042import org.apache.oozie.command.bundle.BundleJobChangeXCommand; 043import org.apache.oozie.command.bundle.BundleJobResumeXCommand; 044import org.apache.oozie.command.bundle.BundleJobSuspendXCommand; 045import org.apache.oozie.command.bundle.BundleJobXCommand; 046import org.apache.oozie.command.bundle.BundleJobsXCommand; 047import org.apache.oozie.command.bundle.BundleKillXCommand; 048import org.apache.oozie.command.bundle.BundleRerunXCommand; 049import org.apache.oozie.command.bundle.BundleSLAAlertsDisableXCommand; 050import org.apache.oozie.command.bundle.BundleSLAAlertsEnableXCommand; 051import org.apache.oozie.command.bundle.BundleSLAChangeXCommand; 052import org.apache.oozie.command.bundle.BundleStartXCommand; 053import org.apache.oozie.command.bundle.BundleSubmitXCommand; 054import org.apache.oozie.executor.jpa.BundleJobQueryExecutor; 055import org.apache.oozie.executor.jpa.JPAExecutorException; 056import org.apache.oozie.service.DagXLogInfoService; 057import org.apache.oozie.service.Services; 058import org.apache.oozie.service.XLogStreamingService; 059import org.apache.oozie.util.DateUtils; 060import org.apache.oozie.util.JobUtils; 061import org.apache.oozie.util.JobsFilterUtils; 062import org.apache.oozie.util.ParamChecker; 063import org.apache.oozie.util.XLog; 064import org.apache.oozie.util.XLogStreamer; 065 066import com.google.common.annotations.VisibleForTesting; 067 068public class BundleEngine extends BaseEngine { 069 /** 070 * Create a system Bundle engine, with no user and no group. 071 */ 072 public BundleEngine() { 073 } 074 075 /** 076 * Create a Bundle engine to perform operations on behave of a user. 077 * 078 * @param user user name. 079 */ 080 public BundleEngine(String user) { 081 this.user = ParamChecker.notEmpty(user, "user"); 082 } 083 084 /* (non-Javadoc) 085 * @see org.apache.oozie.BaseEngine#change(java.lang.String, java.lang.String) 086 */ 087 @Override 088 public void change(String jobId, String changeValue) throws BundleEngineException { 089 try { 090 BundleJobChangeXCommand change = new BundleJobChangeXCommand(jobId, changeValue); 091 change.call(); 092 } 093 catch (CommandException ex) { 094 throw new BundleEngineException(ex); 095 } 096 } 097 098 /* (non-Javadoc) 099 * @see org.apache.oozie.BaseEngine#dryRunSubmit(org.apache.hadoop.conf.Configuration) 100 */ 101 @Override 102 public String dryRunSubmit(Configuration conf) throws BundleEngineException { 103 BundleSubmitXCommand submit = new BundleSubmitXCommand(true, conf); 104 try { 105 String jobId = submit.call(); 106 return jobId; 107 } 108 catch (CommandException ex) { 109 throw new BundleEngineException(ex); 110 } 111 } 112 113 /* (non-Javadoc) 114 * @see org.apache.oozie.BaseEngine#getCoordJob(java.lang.String) 115 */ 116 @Override 117 public CoordinatorJob getCoordJob(String jobId) throws BundleEngineException { 118 throw new BundleEngineException(new XException(ErrorCode.E0301, "cannot get a coordinator job from BundleEngine")); 119 } 120 121 public BundleJobBean getBundleJob(String jobId) throws BundleEngineException { 122 try { 123 return new BundleJobXCommand(jobId).call(); 124 } 125 catch (CommandException ex) { 126 throw new BundleEngineException(ex); 127 } 128 } 129 130 /* (non-Javadoc) 131 * @see org.apache.oozie.BaseEngine#getCoordJob(java.lang.String, int, int) 132 */ 133 @Override 134 public CoordinatorJob getCoordJob(String jobId, String filter, int start, int length, boolean desc) 135 throws BundleEngineException { 136 throw new BundleEngineException(new XException(ErrorCode.E0301, 137 "cannot get a coordinator job from BundleEngine")); 138 } 139 140 /* (non-Javadoc) 141 * @see org.apache.oozie.BaseEngine#getDefinition(java.lang.String) 142 */ 143 @Override 144 public String getDefinition(String jobId) throws BundleEngineException { 145 BundleJobBean job; 146 try { 147 job = new BundleJobXCommand(jobId).call(); 148 } 149 catch (CommandException ex) { 150 throw new BundleEngineException(ex); 151 } 152 return job.getOrigJobXml(); 153 } 154 155 /* (non-Javadoc) 156 * @see org.apache.oozie.BaseEngine#getJob(java.lang.String) 157 */ 158 @Override 159 public WorkflowJob getJob(String jobId) throws BundleEngineException { 160 throw new BundleEngineException(new XException(ErrorCode.E0301, "cannot get a workflow job from BundleEngine")); 161 } 162 163 /* (non-Javadoc) 164 * @see org.apache.oozie.BaseEngine#getJob(java.lang.String, int, int) 165 */ 166 @Override 167 public WorkflowJob getJob(String jobId, int start, int length) throws BundleEngineException { 168 throw new BundleEngineException(new XException(ErrorCode.E0301, "cannot get a workflow job from BundleEngine")); 169 } 170 171 /* (non-Javadoc) 172 * @see org.apache.oozie.BaseEngine#getJobIdForExternalId(java.lang.String) 173 */ 174 @Override 175 public String getJobIdForExternalId(String externalId) throws BundleEngineException { 176 return null; 177 } 178 179 /* (non-Javadoc) 180 * @see org.apache.oozie.BaseEngine#kill(java.lang.String) 181 */ 182 @Override 183 public void kill(String jobId) throws BundleEngineException { 184 try { 185 new BundleKillXCommand(jobId).call(); 186 } 187 catch (CommandException e) { 188 throw new BundleEngineException(e); 189 } 190 } 191 192 /* (non-Javadoc) 193 * @see org.apache.oozie.BaseEngine#reRun(java.lang.String, org.apache.hadoop.conf.Configuration) 194 */ 195 @Override 196 @Deprecated 197 public void reRun(String jobId, Configuration conf) throws BundleEngineException { 198 throw new BundleEngineException(new XException(ErrorCode.E0301, "rerun")); 199 } 200 201 /** 202 * Rerun Bundle actions for given rerunType 203 * 204 * @param jobId bundle job id 205 * @param coordScope the rerun scope for coordinator job names separated by "," 206 * @param dateScope the rerun scope for coordinator nominal times separated by "," 207 * @param refresh true if user wants to refresh input/outpur dataset urls 208 * @param noCleanup false if user wants to cleanup output events for given rerun actions 209 * @throws BaseEngineException thrown if failed to rerun 210 */ 211 public void reRun(String jobId, String coordScope, String dateScope, boolean refresh, boolean noCleanup) 212 throws BaseEngineException { 213 try { 214 new BundleRerunXCommand(jobId, coordScope, dateScope, refresh, noCleanup).call(); 215 } 216 catch (CommandException ex) { 217 throw new BaseEngineException(ex); 218 } 219 } 220 221 /* (non-Javadoc) 222 * @see org.apache.oozie.BaseEngine#resume(java.lang.String) 223 */ 224 @Override 225 public void resume(String jobId) throws BundleEngineException { 226 BundleJobResumeXCommand resume = new BundleJobResumeXCommand(jobId); 227 try { 228 resume.call(); 229 } 230 catch (CommandException ex) { 231 throw new BundleEngineException(ex); 232 } 233 } 234 235 /* (non-Javadoc) 236 * @see org.apache.oozie.BaseEngine#start(java.lang.String) 237 */ 238 @Override 239 public void start(String jobId) throws BundleEngineException { 240 try { 241 new BundleStartXCommand(jobId).call(); 242 } 243 catch (CommandException e) { 244 throw new BundleEngineException(e); 245 } 246 } 247 248 protected void streamJobLog(XLogStreamer logStreamer, String jobId, Writer writer) throws IOException, 249 BundleEngineException { 250 try { 251 BundleJobBean job; 252 logStreamer.getXLogFilter().setParameter(DagXLogInfoService.JOB, jobId); 253 job = new BundleJobXCommand(jobId).call(); 254 Date lastTime = null; 255 if (job.isTerminalStatus()) { 256 lastTime = job.getLastModifiedTime(); 257 } 258 if (lastTime == null) { 259 lastTime = new Date(); 260 } 261 Services.get().get(XLogStreamingService.class) 262 .streamLog(logStreamer, job.getCreatedTime(), lastTime, writer); 263 } 264 catch (CommandException ex) { 265 throw new IOException(ex); 266 } 267 } 268 269 /* (non-Javadoc) 270 * @see org.apache.oozie.BaseEngine#submitJob(org.apache.hadoop.conf.Configuration, boolean) 271 */ 272 @Override 273 public String submitJob(Configuration conf, boolean startJob) throws BundleEngineException { 274 try { 275 String jobId = new BundleSubmitXCommand(conf).call(); 276 277 if (startJob) { 278 start(jobId); 279 } 280 return jobId; 281 } 282 catch (CommandException ex) { 283 throw new BundleEngineException(ex); 284 } 285 } 286 287 /* (non-Javadoc) 288 * @see org.apache.oozie.BaseEngine#suspend(java.lang.String) 289 */ 290 @Override 291 public void suspend(String jobId) throws BundleEngineException { 292 BundleJobSuspendXCommand suspend = new BundleJobSuspendXCommand(jobId); 293 try { 294 suspend.call(); 295 } 296 catch (CommandException ex) { 297 throw new BundleEngineException(ex); 298 } 299 } 300 301 /** 302 * Get bundle jobs 303 * 304 * @param filter the filter string 305 * @param start start location for paging 306 * @param len total length to get 307 * @return bundle job info 308 * @throws BundleEngineException thrown if failed to get bundle job info 309 */ 310 public BundleJobInfo getBundleJobs(String filter, int start, int len) throws BundleEngineException { 311 Map<String, List<String>> filterList = parseFilter(filter); 312 313 try { 314 return new BundleJobsXCommand(filterList, start, len).call(); 315 } 316 catch (CommandException ex) { 317 throw new BundleEngineException(ex); 318 } 319 } 320 321 /** 322 * Parse filter string to a map with key = filter name and values = filter values 323 * 324 * @param filter the filter string 325 * @return filter key and value map 326 * @throws CoordinatorEngineException thrown if failed to parse filter string 327 */ 328 @VisibleForTesting 329 Map<String, List<String>> parseFilter(String filter) throws BundleEngineException { 330 try { 331 return JobsFilterUtils.parseFilter(filter); 332 } 333 catch (ServletException ex) { 334 throw new BundleEngineException(ErrorCode.E0420, filter, ex.getMessage()); 335 } 336 } 337 338 /** 339 * Get bulk job response 340 * 341 * @param bulkFilter the filter string 342 * @param start start location for paging 343 * @param len total length to get 344 * @return bulk job info 345 * @throws BundleEngineException thrown if failed to get bulk job info 346 */ 347 public BulkResponseInfo getBulkJobs(String bulkFilter, int start, int len) throws BundleEngineException { 348 Map<String,List<String>> bulkRequestMap = parseBulkFilter(bulkFilter); 349 try { 350 return new BulkJobsXCommand(bulkRequestMap, start, len).call(); 351 } 352 catch (CommandException ex) { 353 throw new BundleEngineException(ex); 354 } 355 } 356 357 /** 358 * Parse filter string to a map with key = filter name and values = filter values 359 * Allowed keys are defined as constants on top 360 * 361 * @param bulkParams the filter string 362 * @return filter key-value pair map 363 * @throws BundleEngineException thrown if failed to parse filter string 364 */ 365 public static Map<String,List<String>> parseBulkFilter(String bulkParams) throws BundleEngineException { 366 367 Map<String,List<String>> bulkFilter = new HashMap<String,List<String>>(); 368 // Functionality can be extended to different job levels - TODO extend filter parser and query 369 // E.g. String filterlevel = "coordinatoraction"; BulkResponseImpl.BULK_FILTER_LEVEL 370 if (bulkFilter != null) { 371 StringTokenizer st = new StringTokenizer(bulkParams, ";"); 372 while (st.hasMoreTokens()) { 373 String token = st.nextToken(); 374 if (token.contains("=")) { 375 String[] pair = token.split("="); 376 if (pair.length != 2) { 377 throw new BundleEngineException(ErrorCode.E0420, token, 378 "elements must be semicolon-separated name=value pairs"); 379 } 380 pair[0] = pair[0].toLowerCase(); 381 String[] values = pair[1].split(","); 382 if (!BulkResponseImpl.BULK_FILTER_NAMES.contains(pair[0])) { 383 throw new BundleEngineException(ErrorCode.E0420, token, XLog.format("invalid parameter name [{0}]", 384 pair[0])); 385 } 386 // special check and processing for time related params 387 if (pair[0].contains("time")) { 388 try { 389 DateUtils.parseDateUTC(pair[1]); 390 } 391 catch (ParseException e) { 392 throw new BundleEngineException(ErrorCode.E0420, token, XLog.format( 393 "invalid value [{0}] for time. A datetime value of pattern [{1}] is expected", pair[1], 394 DateUtils.ISO8601_UTC_MASK)); 395 } 396 } 397 // special check for action status param 398 // TODO: when extended for levels other than coord action, check against corresponding level's Status values 399 if (pair[0].equals(BulkResponseImpl.BULK_FILTER_STATUS)) { 400 for(String value : values) { 401 try { 402 CoordinatorAction.Status.valueOf(value); 403 } 404 catch (IllegalArgumentException ex) { 405 throw new BundleEngineException(ErrorCode.E0420, token, XLog.format( 406 "invalid action status [{0}]", value)); 407 } 408 } 409 } 410 // eventually adding into map for all cases e.g. names, times, status 411 List<String> list = bulkFilter.get(pair[0]); 412 if (list == null) { 413 list = new ArrayList<String>(); 414 bulkFilter.put(pair[0], list); 415 } 416 for(String value : values) { 417 value = value.trim(); 418 if(value.isEmpty()) { 419 throw new BundleEngineException(ErrorCode.E0420, token, "value is empty or whitespace"); 420 } 421 list.add(value); 422 } 423 } else { 424 throw new BundleEngineException(ErrorCode.E0420, token, 425 "elements must be semicolon-separated name=value pairs"); 426 } 427 } 428 if (!bulkFilter.containsKey(BulkResponseImpl.BULK_FILTER_BUNDLE)) { 429 throw new BundleEngineException(ErrorCode.E0305, BulkResponseImpl.BULK_FILTER_BUNDLE); 430 } 431 } 432 return bulkFilter; 433 } 434 435 /** 436 * Return the status for a Job ID 437 * 438 * @param jobId job Id. 439 * @return the job's status 440 * @throws BundleEngineException thrown if the job's status could not be obtained 441 */ 442 @Override 443 public String getJobStatus(String jobId) throws BundleEngineException { 444 try { 445 BundleJobBean bundleJob = BundleJobQueryExecutor.getInstance().get( 446 BundleJobQueryExecutor.BundleJobQuery.GET_BUNDLE_JOB_STATUS, jobId); 447 return bundleJob.getStatusStr(); 448 } 449 catch (JPAExecutorException e) { 450 throw new BundleEngineException(e); 451 } 452 } 453 454 @Override 455 public void enableSLAAlert(String id, String actions, String dates, String childIds) throws BaseEngineException { 456 try { 457 new BundleSLAAlertsEnableXCommand(id, actions, dates, childIds).call(); 458 } 459 catch (CommandException e) { 460 throw new BundleEngineException(e); 461 } 462 } 463 464 @Override 465 public void disableSLAAlert(String id, String actions, String dates, String childIds) throws BaseEngineException { 466 try { 467 new BundleSLAAlertsDisableXCommand(id, actions, dates, childIds).call(); 468 } 469 catch (CommandException e) { 470 throw new BundleEngineException(e); 471 } 472 } 473 474 @Override 475 public void changeSLA(String id, String actions, String dates, String childIds, String newParams) 476 throws BaseEngineException { 477 Map<String, String> slaNewParams = null; 478 try { 479 480 if (newParams != null) { 481 slaNewParams = JobUtils.parseChangeValue(newParams); 482 } 483 new BundleSLAChangeXCommand(id, actions, dates, childIds, slaNewParams).call(); 484 } 485 catch (CommandException e) { 486 throw new BundleEngineException(e); 487 } 488 } 489 490 /** 491 * return a list of killed Bundle job 492 * 493 * @param filter the filter string for which the bundle jobs are killed 494 * @param start the starting index for bundle jobs 495 * @param len maximum number of jobs to be killed 496 * @return the list of jobs being killed 497 * @throws BundleEngineException thrown if one or more of the jobs cannot be killed 498 */ 499 public BundleJobInfo killJobs(String filter, int start, int len) throws BundleEngineException { 500 try { 501 Map<String, List<String>> filterList = parseFilter(filter); 502 BundleJobInfo bundleJobInfo = new BulkBundleXCommand(filterList, start, len, OperationType.Kill).call(); 503 if (bundleJobInfo == null) { 504 return new BundleJobInfo(new ArrayList<BundleJobBean>(), 0, 0, 0); 505 } 506 return bundleJobInfo; 507 } 508 catch (CommandException ex) { 509 throw new BundleEngineException(ex); 510 } 511 } 512 513 /** 514 * return a list of suspended Bundle job 515 * 516 * @param filter the filter string for which the bundle jobs are suspended 517 * @param start the starting index for bundle jobs 518 * @param len maximum number of jobs to be suspended 519 * @return the list of jobs being suspended 520 * @throws BundleEngineException thrown if one or more of the jobs cannot be suspended 521 */ 522 public BundleJobInfo suspendJobs(String filter, int start, int len) throws BundleEngineException { 523 try { 524 Map<String, List<String>> filterList = parseFilter(filter); 525 BundleJobInfo bundleJobInfo = new BulkBundleXCommand(filterList, start, len, OperationType.Suspend).call(); 526 if (bundleJobInfo == null) { 527 return new BundleJobInfo(new ArrayList<BundleJobBean>(), 0, 0, 0); 528 } 529 return bundleJobInfo; 530 } 531 catch (CommandException ex) { 532 throw new BundleEngineException(ex); 533 } 534 } 535 536 /** 537 * return a list of resumed Bundle job 538 * 539 * @param filter the filter string for which the bundle jobs are resumed 540 * @param start the starting index for bundle jobs 541 * @param len maximum number of jobs to be resumed 542 * @return the list of jobs being resumed 543 * @throws BundleEngineException thrown if one or more of the jobs cannot be resumed 544 */ 545 public BundleJobInfo resumeJobs(String filter, int start, int len) throws BundleEngineException { 546 try { 547 Map<String, List<String>> filterList = parseFilter(filter); 548 BundleJobInfo bundleJobInfo = new BulkBundleXCommand(filterList, start, len, OperationType.Resume).call(); 549 if (bundleJobInfo == null) { 550 return new BundleJobInfo(new ArrayList<BundleJobBean>(), 0, 0, 0); 551 } 552 return bundleJobInfo; 553 } 554 catch (CommandException ex) { 555 throw new BundleEngineException(ex); 556 } 557 } 558}