001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.oozie; 019 020import java.io.IOException; 021import java.io.Writer; 022import java.text.ParseException; 023import java.util.ArrayList; 024import java.util.Date; 025import java.util.HashMap; 026import java.util.HashSet; 027import java.util.List; 028import java.util.Map; 029import java.util.Set; 030import java.util.StringTokenizer; 031 032import org.apache.commons.lang.StringUtils; 033import org.apache.hadoop.conf.Configuration; 034import org.apache.oozie.client.CoordinatorAction; 035import org.apache.oozie.client.CoordinatorJob; 036import org.apache.oozie.client.Job; 037import org.apache.oozie.client.OozieClient; 038import org.apache.oozie.client.WorkflowJob; 039import org.apache.oozie.client.rest.BulkResponseImpl; 040import org.apache.oozie.command.BulkJobsXCommand; 041import org.apache.oozie.command.CommandException; 042import org.apache.oozie.command.bundle.BundleJobChangeXCommand; 043import org.apache.oozie.command.bundle.BundleJobResumeXCommand; 044import org.apache.oozie.command.bundle.BundleJobSuspendXCommand; 045import org.apache.oozie.command.bundle.BundleJobXCommand; 046import org.apache.oozie.command.bundle.BundleJobsXCommand; 047import org.apache.oozie.command.bundle.BundleKillXCommand; 048import org.apache.oozie.command.bundle.BundleRerunXCommand; 049import org.apache.oozie.command.bundle.BundleStartXCommand; 050import org.apache.oozie.command.bundle.BundleSubmitXCommand; 051import org.apache.oozie.service.DagXLogInfoService; 052import org.apache.oozie.service.Services; 053import org.apache.oozie.service.XLogStreamingService; 054import org.apache.oozie.util.DateUtils; 055import org.apache.oozie.util.XLogFilter; 056import org.apache.oozie.util.XLogUserFilterParam; 057import org.apache.oozie.util.ParamChecker; 058import org.apache.oozie.util.XLog; 059 060import com.google.common.annotations.VisibleForTesting; 061 062public class BundleEngine extends BaseEngine { 063 /** 064 * Create a system Bundle engine, with no user and no group. 065 */ 066 public BundleEngine() { 067 } 068 069 /** 070 * Create a Bundle engine to perform operations on behave of a user. 071 * 072 * @param user user name. 073 */ 074 public BundleEngine(String user) { 075 this.user = ParamChecker.notEmpty(user, "user"); 076 } 077 078 /* (non-Javadoc) 079 * @see org.apache.oozie.BaseEngine#change(java.lang.String, java.lang.String) 080 */ 081 @Override 082 public void change(String jobId, String changeValue) throws BundleEngineException { 083 try { 084 BundleJobChangeXCommand change = new BundleJobChangeXCommand(jobId, changeValue); 085 change.call(); 086 } 087 catch (CommandException ex) { 088 throw new BundleEngineException(ex); 089 } 090 } 091 092 /* (non-Javadoc) 093 * @see org.apache.oozie.BaseEngine#dryRunSubmit(org.apache.hadoop.conf.Configuration) 094 */ 095 @Override 096 public String dryRunSubmit(Configuration conf) throws BundleEngineException { 097 BundleSubmitXCommand submit = new BundleSubmitXCommand(true, conf); 098 try { 099 String jobId = submit.call(); 100 return jobId; 101 } 102 catch (CommandException ex) { 103 throw new BundleEngineException(ex); 104 } 105 } 106 107 /* (non-Javadoc) 108 * @see org.apache.oozie.BaseEngine#getCoordJob(java.lang.String) 109 */ 110 @Override 111 public CoordinatorJob getCoordJob(String jobId) throws BundleEngineException { 112 throw new BundleEngineException(new XException(ErrorCode.E0301, "cannot get a coordinator job from BundleEngine")); 113 } 114 115 public BundleJobBean getBundleJob(String jobId) throws BundleEngineException { 116 try { 117 return new BundleJobXCommand(jobId).call(); 118 } 119 catch (CommandException ex) { 120 throw new BundleEngineException(ex); 121 } 122 } 123 124 /* (non-Javadoc) 125 * @see org.apache.oozie.BaseEngine#getCoordJob(java.lang.String, int, int) 126 */ 127 @Override 128 public CoordinatorJob getCoordJob(String jobId, String filter, int start, int length, boolean desc) 129 throws BundleEngineException { 130 throw new BundleEngineException(new XException(ErrorCode.E0301, 131 "cannot get a coordinator job from BundleEngine")); 132 } 133 134 /* (non-Javadoc) 135 * @see org.apache.oozie.BaseEngine#getDefinition(java.lang.String) 136 */ 137 @Override 138 public String getDefinition(String jobId) throws BundleEngineException { 139 BundleJobBean job; 140 try { 141 job = new BundleJobXCommand(jobId).call(); 142 } 143 catch (CommandException ex) { 144 throw new BundleEngineException(ex); 145 } 146 return job.getOrigJobXml(); 147 } 148 149 /* (non-Javadoc) 150 * @see org.apache.oozie.BaseEngine#getJob(java.lang.String) 151 */ 152 @Override 153 public WorkflowJob getJob(String jobId) throws BundleEngineException { 154 throw new BundleEngineException(new XException(ErrorCode.E0301, "cannot get a workflow job from BundleEngine")); 155 } 156 157 /* (non-Javadoc) 158 * @see org.apache.oozie.BaseEngine#getJob(java.lang.String, int, int) 159 */ 160 @Override 161 public WorkflowJob getJob(String jobId, int start, int length) throws BundleEngineException { 162 throw new BundleEngineException(new XException(ErrorCode.E0301, "cannot get a workflow job from BundleEngine")); 163 } 164 165 /* (non-Javadoc) 166 * @see org.apache.oozie.BaseEngine#getJobIdForExternalId(java.lang.String) 167 */ 168 @Override 169 public String getJobIdForExternalId(String externalId) throws BundleEngineException { 170 return null; 171 } 172 173 /* (non-Javadoc) 174 * @see org.apache.oozie.BaseEngine#kill(java.lang.String) 175 */ 176 @Override 177 public void kill(String jobId) throws BundleEngineException { 178 try { 179 new BundleKillXCommand(jobId).call(); 180 } 181 catch (CommandException e) { 182 throw new BundleEngineException(e); 183 } 184 } 185 186 /* (non-Javadoc) 187 * @see org.apache.oozie.BaseEngine#reRun(java.lang.String, org.apache.hadoop.conf.Configuration) 188 */ 189 @Override 190 @Deprecated 191 public void reRun(String jobId, Configuration conf) throws BundleEngineException { 192 throw new BundleEngineException(new XException(ErrorCode.E0301, "rerun")); 193 } 194 195 /** 196 * Rerun Bundle actions for given rerunType 197 * 198 * @param jobId bundle job id 199 * @param coordScope the rerun scope for coordinator job names separated by "," 200 * @param dateScope the rerun scope for coordinator nominal times separated by "," 201 * @param refresh true if user wants to refresh input/outpur dataset urls 202 * @param noCleanup false if user wants to cleanup output events for given rerun actions 203 * @throws BaseEngineException thrown if failed to rerun 204 */ 205 public void reRun(String jobId, String coordScope, String dateScope, boolean refresh, boolean noCleanup) 206 throws BaseEngineException { 207 try { 208 new BundleRerunXCommand(jobId, coordScope, dateScope, refresh, noCleanup).call(); 209 } 210 catch (CommandException ex) { 211 throw new BaseEngineException(ex); 212 } 213 } 214 215 /* (non-Javadoc) 216 * @see org.apache.oozie.BaseEngine#resume(java.lang.String) 217 */ 218 @Override 219 public void resume(String jobId) throws BundleEngineException { 220 BundleJobResumeXCommand resume = new BundleJobResumeXCommand(jobId); 221 try { 222 resume.call(); 223 } 224 catch (CommandException ex) { 225 throw new BundleEngineException(ex); 226 } 227 } 228 229 /* (non-Javadoc) 230 * @see org.apache.oozie.BaseEngine#start(java.lang.String) 231 */ 232 @Override 233 public void start(String jobId) throws BundleEngineException { 234 try { 235 new BundleStartXCommand(jobId).call(); 236 } 237 catch (CommandException e) { 238 throw new BundleEngineException(e); 239 } 240 } 241 242 /* (non-Javadoc) 243 * @see org.apache.oozie.BaseEngine#streamLog(java.lang.String, java.io.Writer) 244 */ 245 @Override 246 public void streamLog(String jobId, Writer writer, Map<String, String[]> params) throws IOException, 247 BundleEngineException { 248 249 BundleJobBean job; 250 try { 251 XLogFilter filter = new XLogFilter(new XLogUserFilterParam(params)); 252 filter.setParameter(DagXLogInfoService.JOB, jobId); 253 job = new BundleJobXCommand(jobId).call(); 254 Date lastTime = null; 255 if (job.isTerminalStatus()) { 256 lastTime = job.getLastModifiedTime(); 257 } 258 if (lastTime == null) { 259 lastTime = new Date(); 260 } 261 Services.get().get(XLogStreamingService.class).streamLog(filter, job.getCreatedTime(), lastTime, writer, params); 262 } 263 catch (Exception ex) { 264 throw new IOException(ex); 265 } 266 } 267 268 /* (non-Javadoc) 269 * @see org.apache.oozie.BaseEngine#submitJob(org.apache.hadoop.conf.Configuration, boolean) 270 */ 271 @Override 272 public String submitJob(Configuration conf, boolean startJob) throws BundleEngineException { 273 try { 274 String jobId = new BundleSubmitXCommand(conf).call(); 275 276 if (startJob) { 277 start(jobId); 278 } 279 return jobId; 280 } 281 catch (CommandException ex) { 282 throw new BundleEngineException(ex); 283 } 284 } 285 286 /* (non-Javadoc) 287 * @see org.apache.oozie.BaseEngine#suspend(java.lang.String) 288 */ 289 @Override 290 public void suspend(String jobId) throws BundleEngineException { 291 BundleJobSuspendXCommand suspend = new BundleJobSuspendXCommand(jobId); 292 try { 293 suspend.call(); 294 } 295 catch (CommandException ex) { 296 throw new BundleEngineException(ex); 297 } 298 } 299 300 private static final Set<String> FILTER_NAMES = new HashSet<String>(); 301 302 static { 303 FILTER_NAMES.add(OozieClient.FILTER_USER); 304 FILTER_NAMES.add(OozieClient.FILTER_NAME); 305 FILTER_NAMES.add(OozieClient.FILTER_GROUP); 306 FILTER_NAMES.add(OozieClient.FILTER_STATUS); 307 FILTER_NAMES.add(OozieClient.FILTER_ID); 308 } 309 310 /** 311 * Get bundle jobs 312 * 313 * @param filter the filter string 314 * @param start start location for paging 315 * @param len total length to get 316 * @return bundle job info 317 * @throws BundleEngineException thrown if failed to get bundle job info 318 */ 319 public BundleJobInfo getBundleJobs(String filter, int start, int len) throws BundleEngineException { 320 Map<String, List<String>> filterList = parseFilter(filter); 321 322 try { 323 return new BundleJobsXCommand(filterList, start, len).call(); 324 } 325 catch (CommandException ex) { 326 throw new BundleEngineException(ex); 327 } 328 } 329 330 /** 331 * Parse filter string to a map with key = filter name and values = filter values 332 * 333 * @param filter the filter string 334 * @return filter key and value map 335 * @throws CoordinatorEngineException thrown if failed to parse filter string 336 */ 337 @VisibleForTesting 338 Map<String, List<String>> parseFilter(String filter) throws BundleEngineException { 339 Map<String, List<String>> map = new HashMap<String, List<String>>(); 340 if (filter != null) { 341 StringTokenizer st = new StringTokenizer(filter, ";"); 342 while (st.hasMoreTokens()) { 343 String token = st.nextToken(); 344 if (token.contains("=")) { 345 String[] pair = token.split("="); 346 if (pair.length != 2) { 347 throw new BundleEngineException(ErrorCode.E0420, filter, "elements must be name=value pairs"); 348 } 349 if (!FILTER_NAMES.contains(pair[0])) { 350 throw new BundleEngineException(ErrorCode.E0420, filter, XLog.format("invalid name [{0}]", 351 pair[0])); 352 } 353 if (pair[0].equals("status")) { 354 try { 355 Job.Status.valueOf(pair[1]); 356 } 357 catch (IllegalArgumentException ex) { 358 throw new BundleEngineException(ErrorCode.E0420, filter, XLog.format( 359 "invalid status [{0}]", pair[1])); 360 } 361 } 362 List<String> list = map.get(pair[0]); 363 if (list == null) { 364 list = new ArrayList<String>(); 365 map.put(pair[0], list); 366 } 367 list.add(pair[1]); 368 } 369 else { 370 throw new BundleEngineException(ErrorCode.E0420, filter, "elements must be name=value pairs"); 371 } 372 } 373 } 374 return map; 375 } 376 377 /** 378 * Get bulk job response 379 * 380 * @param filter the filter string 381 * @param start start location for paging 382 * @param len total length to get 383 * @return bulk job info 384 * @throws BundleEngineException thrown if failed to get bulk job info 385 */ 386 public BulkResponseInfo getBulkJobs(String bulkFilter, int start, int len) throws BundleEngineException { 387 Map<String,List<String>> bulkRequestMap = parseBulkFilter(bulkFilter); 388 try { 389 return new BulkJobsXCommand(bulkRequestMap, start, len).call(); 390 } 391 catch (CommandException ex) { 392 throw new BundleEngineException(ex); 393 } 394 } 395 396 /** 397 * Parse filter string to a map with key = filter name and values = filter values 398 * Allowed keys are defined as constants on top 399 * 400 * @param filter the filter string 401 * @return filter key-value pair map 402 * @throws BundleEngineException thrown if failed to parse filter string 403 */ 404 public static Map<String,List<String>> parseBulkFilter(String bulkParams) throws BundleEngineException { 405 406 Map<String,List<String>> bulkFilter = new HashMap<String,List<String>>(); 407 // Functionality can be extended to different job levels - TODO extend filter parser and query 408 // E.g. String filterlevel = "coordinatoraction"; BulkResponseImpl.BULK_FILTER_LEVEL 409 if (bulkFilter != null) { 410 StringTokenizer st = new StringTokenizer(bulkParams, ";"); 411 while (st.hasMoreTokens()) { 412 String token = st.nextToken(); 413 if (token.contains("=")) { 414 String[] pair = token.split("="); 415 if (pair.length != 2) { 416 throw new BundleEngineException(ErrorCode.E0420, token, 417 "elements must be name=value pairs"); 418 } 419 pair[0] = pair[0].toLowerCase(); 420 String[] values = pair[1].split(","); 421 if (!BulkResponseImpl.BULK_FILTER_NAMES.contains(pair[0])) { 422 throw new BundleEngineException(ErrorCode.E0420, token, XLog.format("invalid parameter name [{0}]", 423 pair[0])); 424 } 425 // special check and processing for time related params 426 if (pair[0].contains("time")) { 427 try { 428 DateUtils.parseDateUTC(pair[1]); 429 } 430 catch (ParseException e) { 431 throw new BundleEngineException(ErrorCode.E0420, token, XLog.format( 432 "invalid value [{0}] for time. A datetime value of pattern [{1}] is expected", pair[1], 433 DateUtils.ISO8601_UTC_MASK)); 434 } 435 } 436 // special check for action status param 437 // TODO: when extended for levels other than coord action, check against corresponding level's Status values 438 if (pair[0].equals(BulkResponseImpl.BULK_FILTER_STATUS)) { 439 for(String value : values) { 440 try { 441 CoordinatorAction.Status.valueOf(value); 442 } 443 catch (IllegalArgumentException ex) { 444 throw new BundleEngineException(ErrorCode.E0420, token, XLog.format( 445 "invalid action status [{0}]", value)); 446 } 447 } 448 } 449 // eventually adding into map for all cases e.g. names, times, status 450 List<String> list = bulkFilter.get(pair[0]); 451 if (list == null) { 452 list = new ArrayList<String>(); 453 bulkFilter.put(pair[0], list); 454 } 455 for(String value : values) { 456 value = value.trim(); 457 if(value.isEmpty()) { 458 throw new BundleEngineException(ErrorCode.E0420, token, "value is empty or whitespace"); 459 } 460 list.add(value); 461 } 462 } else { 463 throw new BundleEngineException(ErrorCode.E0420, token, "elements must be name=value pairs"); 464 } 465 } 466 if (!bulkFilter.containsKey(BulkResponseImpl.BULK_FILTER_BUNDLE)) { 467 throw new BundleEngineException(ErrorCode.E0305, BulkResponseImpl.BULK_FILTER_BUNDLE); 468 } 469 } 470 return bulkFilter; 471 } 472}