001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie; 019 020 import java.io.IOException; 021 import java.io.Writer; 022 import java.text.ParseException; 023 import java.util.ArrayList; 024 import java.util.Date; 025 import java.util.HashMap; 026 import java.util.HashSet; 027 import java.util.List; 028 import java.util.Map; 029 import java.util.Set; 030 import java.util.StringTokenizer; 031 032 import org.apache.hadoop.conf.Configuration; 033 import org.apache.oozie.client.CoordinatorAction; 034 import org.apache.oozie.client.CoordinatorJob; 035 import org.apache.oozie.client.Job; 036 import org.apache.oozie.client.OozieClient; 037 import org.apache.oozie.client.WorkflowJob; 038 import org.apache.oozie.client.rest.BulkResponseImpl; 039 import org.apache.oozie.command.BulkJobsXCommand; 040 import org.apache.oozie.command.CommandException; 041 import org.apache.oozie.command.bundle.BundleJobChangeXCommand; 042 import org.apache.oozie.command.bundle.BundleJobResumeXCommand; 043 import org.apache.oozie.command.bundle.BundleJobSuspendXCommand; 044 import org.apache.oozie.command.bundle.BundleJobXCommand; 045 import org.apache.oozie.command.bundle.BundleJobsXCommand; 046 import org.apache.oozie.command.bundle.BundleKillXCommand; 047 import org.apache.oozie.command.bundle.BundleRerunXCommand; 048 import org.apache.oozie.command.bundle.BundleStartXCommand; 049 import org.apache.oozie.command.bundle.BundleSubmitXCommand; 050 import org.apache.oozie.service.DagXLogInfoService; 051 import org.apache.oozie.service.Services; 052 import org.apache.oozie.service.XLogService; 053 import org.apache.oozie.util.DateUtils; 054 import org.apache.oozie.util.ParamChecker; 055 import org.apache.oozie.util.XLog; 056 import org.apache.oozie.util.XLogStreamer; 057 058 public class BundleEngine extends BaseEngine { 059 /** 060 * Create a system Bundle engine, with no user and no group. 061 */ 062 public BundleEngine() { 063 } 064 065 /** 066 * Create a Bundle engine to perform operations on behave of a user. 067 * 068 * @param user user name. 069 * @param authToken the authentication token. 070 */ 071 public BundleEngine(String user, String authToken) { 072 this.user = ParamChecker.notEmpty(user, "user"); 073 this.authToken = ParamChecker.notEmpty(authToken, "authToken"); 074 } 075 076 /* (non-Javadoc) 077 * @see org.apache.oozie.BaseEngine#change(java.lang.String, java.lang.String) 078 */ 079 @Override 080 public void change(String jobId, String changeValue) throws BundleEngineException { 081 try { 082 BundleJobChangeXCommand change = new BundleJobChangeXCommand(jobId, changeValue); 083 change.call(); 084 } 085 catch (CommandException ex) { 086 throw new BundleEngineException(ex); 087 } 088 } 089 090 /* (non-Javadoc) 091 * @see org.apache.oozie.BaseEngine#dryRunSubmit(org.apache.hadoop.conf.Configuration) 092 */ 093 @Override 094 public String dryRunSubmit(Configuration conf) throws BundleEngineException { 095 BundleSubmitXCommand submit = new BundleSubmitXCommand(true, conf, getAuthToken()); 096 try { 097 String jobId = submit.call(); 098 return jobId; 099 } 100 catch (CommandException ex) { 101 throw new BundleEngineException(ex); 102 } 103 } 104 105 /* (non-Javadoc) 106 * @see org.apache.oozie.BaseEngine#getCoordJob(java.lang.String) 107 */ 108 @Override 109 public CoordinatorJob getCoordJob(String jobId) throws BundleEngineException { 110 throw new BundleEngineException(new XException(ErrorCode.E0301, "cannot get a coordinator job from BundleEngine")); 111 } 112 113 public BundleJobBean getBundleJob(String jobId) throws BundleEngineException { 114 try { 115 return new BundleJobXCommand(jobId).call(); 116 } 117 catch (CommandException ex) { 118 throw new BundleEngineException(ex); 119 } 120 } 121 122 /* (non-Javadoc) 123 * @see org.apache.oozie.BaseEngine#getCoordJob(java.lang.String, int, int) 124 */ 125 @Override 126 public CoordinatorJob getCoordJob(String jobId, String filter, int start, int length) throws BundleEngineException { 127 throw new BundleEngineException(new XException(ErrorCode.E0301, "cannot get a coordinator job from BundleEngine")); 128 } 129 130 /* (non-Javadoc) 131 * @see org.apache.oozie.BaseEngine#getDefinition(java.lang.String) 132 */ 133 @Override 134 public String getDefinition(String jobId) throws BundleEngineException { 135 BundleJobBean job; 136 try { 137 job = new BundleJobXCommand(jobId).call(); 138 } 139 catch (CommandException ex) { 140 throw new BundleEngineException(ex); 141 } 142 return job.getOrigJobXml(); 143 } 144 145 /* (non-Javadoc) 146 * @see org.apache.oozie.BaseEngine#getJob(java.lang.String) 147 */ 148 @Override 149 public WorkflowJob getJob(String jobId) throws BundleEngineException { 150 throw new BundleEngineException(new XException(ErrorCode.E0301, "cannot get a workflow job from BundleEngine")); 151 } 152 153 /* (non-Javadoc) 154 * @see org.apache.oozie.BaseEngine#getJob(java.lang.String, int, int) 155 */ 156 @Override 157 public WorkflowJob getJob(String jobId, int start, int length) throws BundleEngineException { 158 throw new BundleEngineException(new XException(ErrorCode.E0301, "cannot get a workflow job from BundleEngine")); 159 } 160 161 /* (non-Javadoc) 162 * @see org.apache.oozie.BaseEngine#getJobIdForExternalId(java.lang.String) 163 */ 164 @Override 165 public String getJobIdForExternalId(String externalId) throws BundleEngineException { 166 return null; 167 } 168 169 /* (non-Javadoc) 170 * @see org.apache.oozie.BaseEngine#kill(java.lang.String) 171 */ 172 @Override 173 public void kill(String jobId) throws BundleEngineException { 174 try { 175 new BundleKillXCommand(jobId).call(); 176 } 177 catch (CommandException e) { 178 throw new BundleEngineException(e); 179 } 180 } 181 182 /* (non-Javadoc) 183 * @see org.apache.oozie.BaseEngine#reRun(java.lang.String, org.apache.hadoop.conf.Configuration) 184 */ 185 @Override 186 @Deprecated 187 public void reRun(String jobId, Configuration conf) throws BundleEngineException { 188 throw new BundleEngineException(new XException(ErrorCode.E0301, "rerun")); 189 } 190 191 /** 192 * Rerun Bundle actions for given rerunType 193 * 194 * @param jobId bundle job id 195 * @param coordScope the rerun scope for coordinator job names separated by "," 196 * @param dateScope the rerun scope for coordinator nominal times separated by "," 197 * @param refresh true if user wants to refresh input/outpur dataset urls 198 * @param noCleanup false if user wants to cleanup output events for given rerun actions 199 * @throws BaseEngineException thrown if failed to rerun 200 */ 201 public void reRun(String jobId, String coordScope, String dateScope, boolean refresh, boolean noCleanup) 202 throws BaseEngineException { 203 try { 204 new BundleRerunXCommand(jobId, coordScope, dateScope, refresh, noCleanup).call(); 205 } 206 catch (CommandException ex) { 207 throw new BaseEngineException(ex); 208 } 209 } 210 211 /* (non-Javadoc) 212 * @see org.apache.oozie.BaseEngine#resume(java.lang.String) 213 */ 214 @Override 215 public void resume(String jobId) throws BundleEngineException { 216 BundleJobResumeXCommand resume = new BundleJobResumeXCommand(jobId); 217 try { 218 resume.call(); 219 } 220 catch (CommandException ex) { 221 throw new BundleEngineException(ex); 222 } 223 } 224 225 /* (non-Javadoc) 226 * @see org.apache.oozie.BaseEngine#start(java.lang.String) 227 */ 228 @Override 229 public void start(String jobId) throws BundleEngineException { 230 try { 231 new BundleStartXCommand(jobId).call(); 232 } 233 catch (CommandException e) { 234 throw new BundleEngineException(e); 235 } 236 } 237 238 /* (non-Javadoc) 239 * @see org.apache.oozie.BaseEngine#streamLog(java.lang.String, java.io.Writer) 240 */ 241 @Override 242 public void streamLog(String jobId, Writer writer) throws IOException, BundleEngineException { 243 XLogStreamer.Filter filter = new XLogStreamer.Filter(); 244 filter.setParameter(DagXLogInfoService.JOB, jobId); 245 246 BundleJobBean job; 247 try { 248 job = new BundleJobXCommand(jobId).call(); 249 } 250 catch (CommandException ex) { 251 throw new BundleEngineException(ex); 252 } 253 254 Services.get().get(XLogService.class).streamLog(filter, job.getCreatedTime(), new Date(), writer); 255 } 256 257 /* (non-Javadoc) 258 * @see org.apache.oozie.BaseEngine#submitJob(org.apache.hadoop.conf.Configuration, boolean) 259 */ 260 @Override 261 public String submitJob(Configuration conf, boolean startJob) throws BundleEngineException { 262 try { 263 String jobId = new BundleSubmitXCommand(conf, getAuthToken()).call(); 264 265 if (startJob) { 266 start(jobId); 267 } 268 return jobId; 269 } 270 catch (CommandException ex) { 271 throw new BundleEngineException(ex); 272 } 273 } 274 275 /* (non-Javadoc) 276 * @see org.apache.oozie.BaseEngine#suspend(java.lang.String) 277 */ 278 @Override 279 public void suspend(String jobId) throws BundleEngineException { 280 BundleJobSuspendXCommand suspend = new BundleJobSuspendXCommand(jobId); 281 try { 282 suspend.call(); 283 } 284 catch (CommandException ex) { 285 throw new BundleEngineException(ex); 286 } 287 } 288 289 private static final Set<String> FILTER_NAMES = new HashSet<String>(); 290 291 static { 292 FILTER_NAMES.add(OozieClient.FILTER_USER); 293 FILTER_NAMES.add(OozieClient.FILTER_NAME); 294 FILTER_NAMES.add(OozieClient.FILTER_GROUP); 295 FILTER_NAMES.add(OozieClient.FILTER_STATUS); 296 FILTER_NAMES.add(OozieClient.FILTER_ID); 297 } 298 299 /** 300 * Get bundle jobs 301 * 302 * @param filter the filter string 303 * @param start start location for paging 304 * @param len total length to get 305 * @return bundle job info 306 * @throws BundleEngineException thrown if failed to get bundle job info 307 */ 308 public BundleJobInfo getBundleJobs(String filter, int start, int len) throws BundleEngineException { 309 Map<String, List<String>> filterList = parseFilter(filter); 310 311 try { 312 return new BundleJobsXCommand(filterList, start, len).call(); 313 } 314 catch (CommandException ex) { 315 throw new BundleEngineException(ex); 316 } 317 } 318 319 /** 320 * Parse filter string to a map with key = filter name and values = filter values 321 * 322 * @param filter the filter string 323 * @return filter key and value map 324 * @throws CoordinatorEngineException thrown if failed to parse filter string 325 */ 326 private Map<String, List<String>> parseFilter(String filter) throws BundleEngineException { 327 Map<String, List<String>> map = new HashMap<String, List<String>>(); 328 if (filter != null) { 329 StringTokenizer st = new StringTokenizer(filter, ";"); 330 while (st.hasMoreTokens()) { 331 String token = st.nextToken(); 332 if (token.contains("=")) { 333 String[] pair = token.split("="); 334 if (pair.length != 2) { 335 throw new BundleEngineException(ErrorCode.E0420, filter, "elements must be name=value pairs"); 336 } 337 if (!FILTER_NAMES.contains(pair[0])) { 338 throw new BundleEngineException(ErrorCode.E0420, filter, XLog.format("invalid name [{0}]", 339 pair[0])); 340 } 341 if (pair[0].equals("status")) { 342 try { 343 Job.Status.valueOf(pair[1]); 344 } 345 catch (IllegalArgumentException ex) { 346 throw new BundleEngineException(ErrorCode.E0420, filter, XLog.format( 347 "invalid status [{0}]", pair[1])); 348 } 349 } 350 List<String> list = map.get(pair[0]); 351 if (list == null) { 352 list = new ArrayList<String>(); 353 map.put(pair[0], list); 354 } 355 list.add(pair[1]); 356 } 357 else { 358 throw new BundleEngineException(ErrorCode.E0420, filter, "elements must be name=value pairs"); 359 } 360 } 361 } 362 return map; 363 } 364 365 /** 366 * Get bulk job response 367 * 368 * @param filter the filter string 369 * @param start start location for paging 370 * @param len total length to get 371 * @return bulk job info 372 * @throws BundleEngineException thrown if failed to get bulk job info 373 */ 374 public BulkResponseInfo getBulkJobs(String bulkFilter, int start, int len) throws BundleEngineException { 375 Map<String,List<String>> bulkRequestMap = parseBulkFilter(bulkFilter); 376 try { 377 return new BulkJobsXCommand(bulkRequestMap, start, len).call(); 378 } 379 catch (CommandException ex) { 380 throw new BundleEngineException(ex); 381 } 382 } 383 384 /** 385 * Parse filter string to a map with key = filter name and values = filter values 386 * Allowed keys are defined as constants on top 387 * 388 * @param filter the filter string 389 * @return filter key-value pair map 390 * @throws BundleEngineException thrown if failed to parse filter string 391 */ 392 public static Map<String,List<String>> parseBulkFilter(String bulkParams) throws BundleEngineException { 393 394 Map<String,List<String>> bulkFilter = new HashMap<String,List<String>>(); 395 // Functionality can be extended to different job levels - TODO extend filter parser and query 396 // E.g. String filterlevel = "coordinatoraction"; BulkResponseImpl.BULK_FILTER_LEVEL 397 if (bulkFilter != null) { 398 StringTokenizer st = new StringTokenizer(bulkParams, ";"); 399 while (st.hasMoreTokens()) { 400 String token = st.nextToken(); 401 if (token.contains("=")) { 402 String[] pair = token.split("="); 403 if (pair.length != 2) { 404 throw new BundleEngineException(ErrorCode.E0420, token, 405 "elements must be name=value pairs"); 406 } 407 pair[0] = pair[0].toLowerCase(); 408 String[] values = pair[1].split(","); 409 if (!BulkResponseImpl.BULK_FILTER_NAMES.contains(pair[0])) { 410 throw new BundleEngineException(ErrorCode.E0420, token, XLog.format("invalid parameter name [{0}]", 411 pair[0])); 412 } 413 // special check and processing for time related params 414 if (pair[0].contains("time")) { 415 try { 416 DateUtils.parseDateUTC(pair[1]); 417 } 418 catch (ParseException e) { 419 throw new BundleEngineException(ErrorCode.E0420, token, XLog.format( 420 "invalid value [{0}] for time. A datetime value of pattern [{1}] is expected", pair[1], 421 DateUtils.ISO8601_UTC_MASK)); 422 } 423 } 424 // special check for action status param 425 // TODO: when extended for levels other than coord action, check against corresponding level's Status values 426 if (pair[0].equals(BulkResponseImpl.BULK_FILTER_STATUS)) { 427 for(String value : values) { 428 try { 429 CoordinatorAction.Status.valueOf(value); 430 } 431 catch (IllegalArgumentException ex) { 432 throw new BundleEngineException(ErrorCode.E0420, token, XLog.format( 433 "invalid action status [{0}]", value)); 434 } 435 } 436 } 437 // eventually adding into map for all cases e.g. names, times, status 438 List<String> list = bulkFilter.get(pair[0]); 439 if (list == null) { 440 list = new ArrayList<String>(); 441 bulkFilter.put(pair[0], list); 442 } 443 for(String value : values) { 444 value = value.trim(); 445 if(value.isEmpty()) { 446 throw new BundleEngineException(ErrorCode.E0420, token, "value is empty or whitespace"); 447 } 448 list.add(value); 449 } 450 } else { 451 throw new BundleEngineException(ErrorCode.E0420, token, "elements must be name=value pairs"); 452 } 453 } 454 if(!bulkFilter.containsKey(BulkResponseImpl.BULK_FILTER_BUNDLE_NAME)) { 455 throw new BundleEngineException(ErrorCode.E0305, BulkResponseImpl.BULK_FILTER_BUNDLE_NAME); 456 } 457 } 458 return bulkFilter; 459 } 460 }