001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie; 019 020 import java.io.IOException; 021 import java.io.Writer; 022 import java.text.ParseException; 023 import java.util.ArrayList; 024 import java.util.Date; 025 import java.util.HashMap; 026 import java.util.HashSet; 027 import java.util.List; 028 import java.util.Map; 029 import java.util.Set; 030 import java.util.StringTokenizer; 031 032 import org.apache.hadoop.conf.Configuration; 033 import org.apache.oozie.client.CoordinatorAction; 034 import org.apache.oozie.client.CoordinatorJob; 035 import org.apache.oozie.client.Job; 036 import org.apache.oozie.client.OozieClient; 037 import org.apache.oozie.client.WorkflowJob; 038 import org.apache.oozie.client.rest.BulkResponseImpl; 039 import org.apache.oozie.command.BulkJobsXCommand; 040 import org.apache.oozie.command.CommandException; 041 import org.apache.oozie.command.bundle.BundleJobChangeXCommand; 042 import org.apache.oozie.command.bundle.BundleJobResumeXCommand; 043 import org.apache.oozie.command.bundle.BundleJobSuspendXCommand; 044 import org.apache.oozie.command.bundle.BundleJobXCommand; 045 import org.apache.oozie.command.bundle.BundleJobsXCommand; 046 import org.apache.oozie.command.bundle.BundleKillXCommand; 047 import org.apache.oozie.command.bundle.BundleRerunXCommand; 048 import org.apache.oozie.command.bundle.BundleStartXCommand; 049 import org.apache.oozie.command.bundle.BundleSubmitXCommand; 050 import org.apache.oozie.service.DagXLogInfoService; 051 import org.apache.oozie.service.Services; 052 import org.apache.oozie.service.XLogService; 053 import org.apache.oozie.util.DateUtils; 054 import org.apache.oozie.util.ParamChecker; 055 import org.apache.oozie.util.XLog; 056 import org.apache.oozie.util.XLogStreamer; 057 058 import com.google.common.annotations.VisibleForTesting; 059 060 public class BundleEngine extends BaseEngine { 061 /** 062 * Create a system Bundle engine, with no user and no group. 063 */ 064 public BundleEngine() { 065 } 066 067 /** 068 * Create a Bundle engine to perform operations on behave of a user. 069 * 070 * @param user user name. 071 */ 072 public BundleEngine(String user) { 073 this.user = ParamChecker.notEmpty(user, "user"); 074 } 075 076 /* (non-Javadoc) 077 * @see org.apache.oozie.BaseEngine#change(java.lang.String, java.lang.String) 078 */ 079 @Override 080 public void change(String jobId, String changeValue) throws BundleEngineException { 081 try { 082 BundleJobChangeXCommand change = new BundleJobChangeXCommand(jobId, changeValue); 083 change.call(); 084 } 085 catch (CommandException ex) { 086 throw new BundleEngineException(ex); 087 } 088 } 089 090 /* (non-Javadoc) 091 * @see org.apache.oozie.BaseEngine#dryRunSubmit(org.apache.hadoop.conf.Configuration) 092 */ 093 @Override 094 public String dryRunSubmit(Configuration conf) throws BundleEngineException { 095 BundleSubmitXCommand submit = new BundleSubmitXCommand(true, conf); 096 try { 097 String jobId = submit.call(); 098 return jobId; 099 } 100 catch (CommandException ex) { 101 throw new BundleEngineException(ex); 102 } 103 } 104 105 /* (non-Javadoc) 106 * @see org.apache.oozie.BaseEngine#getCoordJob(java.lang.String) 107 */ 108 @Override 109 public CoordinatorJob getCoordJob(String jobId) throws BundleEngineException { 110 throw new BundleEngineException(new XException(ErrorCode.E0301, "cannot get a coordinator job from BundleEngine")); 111 } 112 113 public BundleJobBean getBundleJob(String jobId) throws BundleEngineException { 114 try { 115 return new BundleJobXCommand(jobId).call(); 116 } 117 catch (CommandException ex) { 118 throw new BundleEngineException(ex); 119 } 120 } 121 122 /* (non-Javadoc) 123 * @see org.apache.oozie.BaseEngine#getCoordJob(java.lang.String, int, int) 124 */ 125 @Override 126 public CoordinatorJob getCoordJob(String jobId, String filter, int start, int length, boolean desc) 127 throws BundleEngineException { 128 throw new BundleEngineException(new XException(ErrorCode.E0301, 129 "cannot get a coordinator job from BundleEngine")); 130 } 131 132 /* (non-Javadoc) 133 * @see org.apache.oozie.BaseEngine#getDefinition(java.lang.String) 134 */ 135 @Override 136 public String getDefinition(String jobId) throws BundleEngineException { 137 BundleJobBean job; 138 try { 139 job = new BundleJobXCommand(jobId).call(); 140 } 141 catch (CommandException ex) { 142 throw new BundleEngineException(ex); 143 } 144 return job.getOrigJobXml(); 145 } 146 147 /* (non-Javadoc) 148 * @see org.apache.oozie.BaseEngine#getJob(java.lang.String) 149 */ 150 @Override 151 public WorkflowJob getJob(String jobId) throws BundleEngineException { 152 throw new BundleEngineException(new XException(ErrorCode.E0301, "cannot get a workflow job from BundleEngine")); 153 } 154 155 /* (non-Javadoc) 156 * @see org.apache.oozie.BaseEngine#getJob(java.lang.String, int, int) 157 */ 158 @Override 159 public WorkflowJob getJob(String jobId, int start, int length) throws BundleEngineException { 160 throw new BundleEngineException(new XException(ErrorCode.E0301, "cannot get a workflow job from BundleEngine")); 161 } 162 163 /* (non-Javadoc) 164 * @see org.apache.oozie.BaseEngine#getJobIdForExternalId(java.lang.String) 165 */ 166 @Override 167 public String getJobIdForExternalId(String externalId) throws BundleEngineException { 168 return null; 169 } 170 171 /* (non-Javadoc) 172 * @see org.apache.oozie.BaseEngine#kill(java.lang.String) 173 */ 174 @Override 175 public void kill(String jobId) throws BundleEngineException { 176 try { 177 new BundleKillXCommand(jobId).call(); 178 } 179 catch (CommandException e) { 180 throw new BundleEngineException(e); 181 } 182 } 183 184 /* (non-Javadoc) 185 * @see org.apache.oozie.BaseEngine#reRun(java.lang.String, org.apache.hadoop.conf.Configuration) 186 */ 187 @Override 188 @Deprecated 189 public void reRun(String jobId, Configuration conf) throws BundleEngineException { 190 throw new BundleEngineException(new XException(ErrorCode.E0301, "rerun")); 191 } 192 193 /** 194 * Rerun Bundle actions for given rerunType 195 * 196 * @param jobId bundle job id 197 * @param coordScope the rerun scope for coordinator job names separated by "," 198 * @param dateScope the rerun scope for coordinator nominal times separated by "," 199 * @param refresh true if user wants to refresh input/outpur dataset urls 200 * @param noCleanup false if user wants to cleanup output events for given rerun actions 201 * @throws BaseEngineException thrown if failed to rerun 202 */ 203 public void reRun(String jobId, String coordScope, String dateScope, boolean refresh, boolean noCleanup) 204 throws BaseEngineException { 205 try { 206 new BundleRerunXCommand(jobId, coordScope, dateScope, refresh, noCleanup).call(); 207 } 208 catch (CommandException ex) { 209 throw new BaseEngineException(ex); 210 } 211 } 212 213 /* (non-Javadoc) 214 * @see org.apache.oozie.BaseEngine#resume(java.lang.String) 215 */ 216 @Override 217 public void resume(String jobId) throws BundleEngineException { 218 BundleJobResumeXCommand resume = new BundleJobResumeXCommand(jobId); 219 try { 220 resume.call(); 221 } 222 catch (CommandException ex) { 223 throw new BundleEngineException(ex); 224 } 225 } 226 227 /* (non-Javadoc) 228 * @see org.apache.oozie.BaseEngine#start(java.lang.String) 229 */ 230 @Override 231 public void start(String jobId) throws BundleEngineException { 232 try { 233 new BundleStartXCommand(jobId).call(); 234 } 235 catch (CommandException e) { 236 throw new BundleEngineException(e); 237 } 238 } 239 240 /* (non-Javadoc) 241 * @see org.apache.oozie.BaseEngine#streamLog(java.lang.String, java.io.Writer) 242 */ 243 @Override 244 public void streamLog(String jobId, Writer writer) throws IOException, BundleEngineException { 245 XLogStreamer.Filter filter = new XLogStreamer.Filter(); 246 filter.setParameter(DagXLogInfoService.JOB, jobId); 247 248 BundleJobBean job; 249 try { 250 job = new BundleJobXCommand(jobId).call(); 251 } 252 catch (CommandException ex) { 253 throw new BundleEngineException(ex); 254 } 255 256 Services.get().get(XLogService.class).streamLog(filter, job.getCreatedTime(), new Date(), writer); 257 } 258 259 /* (non-Javadoc) 260 * @see org.apache.oozie.BaseEngine#submitJob(org.apache.hadoop.conf.Configuration, boolean) 261 */ 262 @Override 263 public String submitJob(Configuration conf, boolean startJob) throws BundleEngineException { 264 try { 265 String jobId = new BundleSubmitXCommand(conf).call(); 266 267 if (startJob) { 268 start(jobId); 269 } 270 return jobId; 271 } 272 catch (CommandException ex) { 273 throw new BundleEngineException(ex); 274 } 275 } 276 277 /* (non-Javadoc) 278 * @see org.apache.oozie.BaseEngine#suspend(java.lang.String) 279 */ 280 @Override 281 public void suspend(String jobId) throws BundleEngineException { 282 BundleJobSuspendXCommand suspend = new BundleJobSuspendXCommand(jobId); 283 try { 284 suspend.call(); 285 } 286 catch (CommandException ex) { 287 throw new BundleEngineException(ex); 288 } 289 } 290 291 private static final Set<String> FILTER_NAMES = new HashSet<String>(); 292 293 static { 294 FILTER_NAMES.add(OozieClient.FILTER_USER); 295 FILTER_NAMES.add(OozieClient.FILTER_NAME); 296 FILTER_NAMES.add(OozieClient.FILTER_GROUP); 297 FILTER_NAMES.add(OozieClient.FILTER_STATUS); 298 FILTER_NAMES.add(OozieClient.FILTER_ID); 299 } 300 301 /** 302 * Get bundle jobs 303 * 304 * @param filter the filter string 305 * @param start start location for paging 306 * @param len total length to get 307 * @return bundle job info 308 * @throws BundleEngineException thrown if failed to get bundle job info 309 */ 310 public BundleJobInfo getBundleJobs(String filter, int start, int len) throws BundleEngineException { 311 Map<String, List<String>> filterList = parseFilter(filter); 312 313 try { 314 return new BundleJobsXCommand(filterList, start, len).call(); 315 } 316 catch (CommandException ex) { 317 throw new BundleEngineException(ex); 318 } 319 } 320 321 /** 322 * Parse filter string to a map with key = filter name and values = filter values 323 * 324 * @param filter the filter string 325 * @return filter key and value map 326 * @throws CoordinatorEngineException thrown if failed to parse filter string 327 */ 328 @VisibleForTesting 329 Map<String, List<String>> parseFilter(String filter) throws BundleEngineException { 330 Map<String, List<String>> map = new HashMap<String, List<String>>(); 331 if (filter != null) { 332 StringTokenizer st = new StringTokenizer(filter, ";"); 333 while (st.hasMoreTokens()) { 334 String token = st.nextToken(); 335 if (token.contains("=")) { 336 String[] pair = token.split("="); 337 if (pair.length != 2) { 338 throw new BundleEngineException(ErrorCode.E0420, filter, "elements must be name=value pairs"); 339 } 340 if (!FILTER_NAMES.contains(pair[0])) { 341 throw new BundleEngineException(ErrorCode.E0420, filter, XLog.format("invalid name [{0}]", 342 pair[0])); 343 } 344 if (pair[0].equals("status")) { 345 try { 346 Job.Status.valueOf(pair[1]); 347 } 348 catch (IllegalArgumentException ex) { 349 throw new BundleEngineException(ErrorCode.E0420, filter, XLog.format( 350 "invalid status [{0}]", pair[1])); 351 } 352 } 353 List<String> list = map.get(pair[0]); 354 if (list == null) { 355 list = new ArrayList<String>(); 356 map.put(pair[0], list); 357 } 358 list.add(pair[1]); 359 } 360 else { 361 throw new BundleEngineException(ErrorCode.E0420, filter, "elements must be name=value pairs"); 362 } 363 } 364 } 365 return map; 366 } 367 368 /** 369 * Get bulk job response 370 * 371 * @param filter the filter string 372 * @param start start location for paging 373 * @param len total length to get 374 * @return bulk job info 375 * @throws BundleEngineException thrown if failed to get bulk job info 376 */ 377 public BulkResponseInfo getBulkJobs(String bulkFilter, int start, int len) throws BundleEngineException { 378 Map<String,List<String>> bulkRequestMap = parseBulkFilter(bulkFilter); 379 try { 380 return new BulkJobsXCommand(bulkRequestMap, start, len).call(); 381 } 382 catch (CommandException ex) { 383 throw new BundleEngineException(ex); 384 } 385 } 386 387 /** 388 * Parse filter string to a map with key = filter name and values = filter values 389 * Allowed keys are defined as constants on top 390 * 391 * @param filter the filter string 392 * @return filter key-value pair map 393 * @throws BundleEngineException thrown if failed to parse filter string 394 */ 395 public static Map<String,List<String>> parseBulkFilter(String bulkParams) throws BundleEngineException { 396 397 Map<String,List<String>> bulkFilter = new HashMap<String,List<String>>(); 398 // Functionality can be extended to different job levels - TODO extend filter parser and query 399 // E.g. String filterlevel = "coordinatoraction"; BulkResponseImpl.BULK_FILTER_LEVEL 400 if (bulkFilter != null) { 401 StringTokenizer st = new StringTokenizer(bulkParams, ";"); 402 while (st.hasMoreTokens()) { 403 String token = st.nextToken(); 404 if (token.contains("=")) { 405 String[] pair = token.split("="); 406 if (pair.length != 2) { 407 throw new BundleEngineException(ErrorCode.E0420, token, 408 "elements must be name=value pairs"); 409 } 410 pair[0] = pair[0].toLowerCase(); 411 String[] values = pair[1].split(","); 412 if (!BulkResponseImpl.BULK_FILTER_NAMES.contains(pair[0])) { 413 throw new BundleEngineException(ErrorCode.E0420, token, XLog.format("invalid parameter name [{0}]", 414 pair[0])); 415 } 416 // special check and processing for time related params 417 if (pair[0].contains("time")) { 418 try { 419 DateUtils.parseDateUTC(pair[1]); 420 } 421 catch (ParseException e) { 422 throw new BundleEngineException(ErrorCode.E0420, token, XLog.format( 423 "invalid value [{0}] for time. A datetime value of pattern [{1}] is expected", pair[1], 424 DateUtils.ISO8601_UTC_MASK)); 425 } 426 } 427 // special check for action status param 428 // TODO: when extended for levels other than coord action, check against corresponding level's Status values 429 if (pair[0].equals(BulkResponseImpl.BULK_FILTER_STATUS)) { 430 for(String value : values) { 431 try { 432 CoordinatorAction.Status.valueOf(value); 433 } 434 catch (IllegalArgumentException ex) { 435 throw new BundleEngineException(ErrorCode.E0420, token, XLog.format( 436 "invalid action status [{0}]", value)); 437 } 438 } 439 } 440 // eventually adding into map for all cases e.g. names, times, status 441 List<String> list = bulkFilter.get(pair[0]); 442 if (list == null) { 443 list = new ArrayList<String>(); 444 bulkFilter.put(pair[0], list); 445 } 446 for(String value : values) { 447 value = value.trim(); 448 if(value.isEmpty()) { 449 throw new BundleEngineException(ErrorCode.E0420, token, "value is empty or whitespace"); 450 } 451 list.add(value); 452 } 453 } else { 454 throw new BundleEngineException(ErrorCode.E0420, token, "elements must be name=value pairs"); 455 } 456 } 457 if(!bulkFilter.containsKey(BulkResponseImpl.BULK_FILTER_BUNDLE_NAME)) { 458 throw new BundleEngineException(ErrorCode.E0305, BulkResponseImpl.BULK_FILTER_BUNDLE_NAME); 459 } 460 } 461 return bulkFilter; 462 } 463 }