001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie; 019 020 import java.io.IOException; 021 import java.io.Writer; 022 import java.util.ArrayList; 023 import java.util.Date; 024 import java.util.HashMap; 025 import java.util.HashSet; 026 import java.util.List; 027 import java.util.Map; 028 import java.util.Set; 029 import java.util.StringTokenizer; 030 031 import org.apache.hadoop.conf.Configuration; 032 import org.apache.oozie.client.CoordinatorJob; 033 import org.apache.oozie.client.Job; 034 import org.apache.oozie.client.OozieClient; 035 import org.apache.oozie.client.WorkflowJob; 036 import org.apache.oozie.command.CommandException; 037 import org.apache.oozie.command.bundle.BundleJobChangeXCommand; 038 import org.apache.oozie.command.bundle.BundleJobResumeXCommand; 039 import org.apache.oozie.command.bundle.BundleJobSuspendXCommand; 040 import org.apache.oozie.command.bundle.BundleJobXCommand; 041 import org.apache.oozie.command.bundle.BundleJobsXCommand; 042 import org.apache.oozie.command.bundle.BundleKillXCommand; 043 import org.apache.oozie.command.bundle.BundleRerunXCommand; 044 import org.apache.oozie.command.bundle.BundleStartXCommand; 045 import org.apache.oozie.command.bundle.BundleSubmitXCommand; 046 import org.apache.oozie.service.DagXLogInfoService; 047 import org.apache.oozie.service.Services; 048 import org.apache.oozie.service.XLogService; 049 import org.apache.oozie.util.ParamChecker; 050 import org.apache.oozie.util.XLog; 051 import org.apache.oozie.util.XLogStreamer; 052 053 public class BundleEngine extends BaseEngine { 054 /** 055 * Create a system Bundle engine, with no user and no group. 056 */ 057 public BundleEngine() { 058 } 059 060 /** 061 * Create a Bundle engine to perform operations on behave of a user. 062 * 063 * @param user user name. 064 * @param authToken the authentication token. 065 */ 066 public BundleEngine(String user, String authToken) { 067 this.user = ParamChecker.notEmpty(user, "user"); 068 this.authToken = ParamChecker.notEmpty(authToken, "authToken"); 069 } 070 071 /* (non-Javadoc) 072 * @see org.apache.oozie.BaseEngine#change(java.lang.String, java.lang.String) 073 */ 074 @Override 075 public void change(String jobId, String changeValue) throws BundleEngineException { 076 try { 077 BundleJobChangeXCommand change = new BundleJobChangeXCommand(jobId, changeValue); 078 change.call(); 079 } 080 catch (CommandException ex) { 081 throw new BundleEngineException(ex); 082 } 083 } 084 085 /* (non-Javadoc) 086 * @see org.apache.oozie.BaseEngine#dryrunSubmit(org.apache.hadoop.conf.Configuration, boolean) 087 */ 088 @Override 089 public String dryrunSubmit(Configuration conf, boolean startJob) throws BundleEngineException { 090 BundleSubmitXCommand submit = new BundleSubmitXCommand(true, conf, getAuthToken()); 091 try { 092 String jobId = submit.call(); 093 return jobId; 094 } 095 catch (CommandException ex) { 096 throw new BundleEngineException(ex); 097 } 098 } 099 100 /* (non-Javadoc) 101 * @see org.apache.oozie.BaseEngine#getCoordJob(java.lang.String) 102 */ 103 @Override 104 public CoordinatorJob getCoordJob(String jobId) throws BundleEngineException { 105 throw new BundleEngineException(new XException(ErrorCode.E0301)); 106 } 107 108 public BundleJobBean getBundleJob(String jobId) throws BundleEngineException { 109 try { 110 return new BundleJobXCommand(jobId).call(); 111 } 112 catch (CommandException ex) { 113 throw new BundleEngineException(ex); 114 } 115 } 116 117 /* (non-Javadoc) 118 * @see org.apache.oozie.BaseEngine#getCoordJob(java.lang.String, int, int) 119 */ 120 @Override 121 public CoordinatorJob getCoordJob(String jobId, int start, int length) throws BundleEngineException { 122 throw new BundleEngineException(new XException(ErrorCode.E0301)); 123 } 124 125 /* (non-Javadoc) 126 * @see org.apache.oozie.BaseEngine#getDefinition(java.lang.String) 127 */ 128 @Override 129 public String getDefinition(String jobId) throws BundleEngineException { 130 BundleJobBean job; 131 try { 132 job = new BundleJobXCommand(jobId).call(); 133 } 134 catch (CommandException ex) { 135 throw new BundleEngineException(ex); 136 } 137 return job.getOrigJobXml(); 138 } 139 140 /* (non-Javadoc) 141 * @see org.apache.oozie.BaseEngine#getJob(java.lang.String) 142 */ 143 @Override 144 public WorkflowJob getJob(String jobId) throws BundleEngineException { 145 throw new BundleEngineException(new XException(ErrorCode.E0301)); 146 } 147 148 /* (non-Javadoc) 149 * @see org.apache.oozie.BaseEngine#getJob(java.lang.String, int, int) 150 */ 151 @Override 152 public WorkflowJob getJob(String jobId, int start, int length) throws BundleEngineException { 153 throw new BundleEngineException(new XException(ErrorCode.E0301)); 154 } 155 156 /* (non-Javadoc) 157 * @see org.apache.oozie.BaseEngine#getJobIdForExternalId(java.lang.String) 158 */ 159 @Override 160 public String getJobIdForExternalId(String externalId) throws BundleEngineException { 161 return null; 162 } 163 164 /* (non-Javadoc) 165 * @see org.apache.oozie.BaseEngine#kill(java.lang.String) 166 */ 167 @Override 168 public void kill(String jobId) throws BundleEngineException { 169 try { 170 new BundleKillXCommand(jobId).call(); 171 } 172 catch (CommandException e) { 173 throw new BundleEngineException(e); 174 } 175 } 176 177 /* (non-Javadoc) 178 * @see org.apache.oozie.BaseEngine#reRun(java.lang.String, org.apache.hadoop.conf.Configuration) 179 */ 180 @Override 181 @Deprecated 182 public void reRun(String jobId, Configuration conf) throws BundleEngineException { 183 throw new BundleEngineException(new XException(ErrorCode.E0301)); 184 } 185 186 /** 187 * Rerun Bundle actions for given rerunType 188 * 189 * @param jobId bundle job id 190 * @param coordScope the rerun scope for coordinator job names separated by "," 191 * @param dateScope the rerun scope for coordinator nominal times separated by "," 192 * @param refresh true if user wants to refresh input/outpur dataset urls 193 * @param noCleanup false if user wants to cleanup output events for given rerun actions 194 * @throws BaseEngineException thrown if failed to rerun 195 */ 196 public void reRun(String jobId, String coordScope, String dateScope, boolean refresh, boolean noCleanup) 197 throws BaseEngineException { 198 try { 199 new BundleRerunXCommand(jobId, coordScope, dateScope, refresh, noCleanup).call(); 200 } 201 catch (CommandException ex) { 202 throw new BaseEngineException(ex); 203 } 204 } 205 206 /* (non-Javadoc) 207 * @see org.apache.oozie.BaseEngine#resume(java.lang.String) 208 */ 209 @Override 210 public void resume(String jobId) throws BundleEngineException { 211 BundleJobResumeXCommand resume = new BundleJobResumeXCommand(jobId); 212 try { 213 resume.call(); 214 } 215 catch (CommandException ex) { 216 throw new BundleEngineException(ex); 217 } 218 } 219 220 /* (non-Javadoc) 221 * @see org.apache.oozie.BaseEngine#start(java.lang.String) 222 */ 223 @Override 224 public void start(String jobId) throws BundleEngineException { 225 try { 226 new BundleStartXCommand(jobId).call(); 227 } 228 catch (CommandException e) { 229 throw new BundleEngineException(e); 230 } 231 } 232 233 /* (non-Javadoc) 234 * @see org.apache.oozie.BaseEngine#streamLog(java.lang.String, java.io.Writer) 235 */ 236 @Override 237 public void streamLog(String jobId, Writer writer) throws IOException, BundleEngineException { 238 XLogStreamer.Filter filter = new XLogStreamer.Filter(); 239 filter.setParameter(DagXLogInfoService.JOB, jobId); 240 241 BundleJobBean job; 242 try { 243 job = new BundleJobXCommand(jobId).call(); 244 } 245 catch (CommandException ex) { 246 throw new BundleEngineException(ex); 247 } 248 249 Services.get().get(XLogService.class).streamLog(filter, job.getCreatedTime(), new Date(), writer); 250 } 251 252 /* (non-Javadoc) 253 * @see org.apache.oozie.BaseEngine#submitJob(org.apache.hadoop.conf.Configuration, boolean) 254 */ 255 @Override 256 public String submitJob(Configuration conf, boolean startJob) throws BundleEngineException { 257 try { 258 String jobId = new BundleSubmitXCommand(conf, getAuthToken()).call(); 259 260 if (startJob) { 261 start(jobId); 262 } 263 return jobId; 264 } 265 catch (CommandException ex) { 266 throw new BundleEngineException(ex); 267 } 268 } 269 270 /* (non-Javadoc) 271 * @see org.apache.oozie.BaseEngine#suspend(java.lang.String) 272 */ 273 @Override 274 public void suspend(String jobId) throws BundleEngineException { 275 BundleJobSuspendXCommand suspend = new BundleJobSuspendXCommand(jobId); 276 try { 277 suspend.call(); 278 } 279 catch (CommandException ex) { 280 throw new BundleEngineException(ex); 281 } 282 } 283 284 private static final Set<String> FILTER_NAMES = new HashSet<String>(); 285 286 static { 287 FILTER_NAMES.add(OozieClient.FILTER_USER); 288 FILTER_NAMES.add(OozieClient.FILTER_NAME); 289 FILTER_NAMES.add(OozieClient.FILTER_GROUP); 290 FILTER_NAMES.add(OozieClient.FILTER_STATUS); 291 FILTER_NAMES.add(OozieClient.FILTER_ID); 292 } 293 294 /** 295 * Get bundle jobs 296 * 297 * @param filterStr the filter string 298 * @param start start location for paging 299 * @param len total length to get 300 * @return bundle job info 301 * @throws BundleEngineException thrown if failed to get bundle job info 302 */ 303 public BundleJobInfo getBundleJobs(String filterStr, int start, int len) throws BundleEngineException { 304 Map<String, List<String>> filter = parseFilter(filterStr); 305 306 try { 307 return new BundleJobsXCommand(filter, start, len).call(); 308 } 309 catch (CommandException ex) { 310 throw new BundleEngineException(ex); 311 } 312 } 313 314 /** 315 * Parse filter string to a map with key = filter name and values = filter values 316 * 317 * @param filter the filter string 318 * @return filter key and value map 319 * @throws CoordinatorEngineException thrown if failed to parse filter string 320 */ 321 private Map<String, List<String>> parseFilter(String filter) throws BundleEngineException { 322 Map<String, List<String>> map = new HashMap<String, List<String>>(); 323 if (filter != null) { 324 StringTokenizer st = new StringTokenizer(filter, ";"); 325 while (st.hasMoreTokens()) { 326 String token = st.nextToken(); 327 if (token.contains("=")) { 328 String[] pair = token.split("="); 329 if (pair.length != 2) { 330 throw new BundleEngineException(ErrorCode.E0420, filter, "elements must be name=value pairs"); 331 } 332 if (!FILTER_NAMES.contains(pair[0])) { 333 throw new BundleEngineException(ErrorCode.E0420, filter, XLog.format("invalid name [{0}]", 334 pair[0])); 335 } 336 if (pair[0].equals("status")) { 337 try { 338 Job.Status.valueOf(pair[1]); 339 } 340 catch (IllegalArgumentException ex) { 341 throw new BundleEngineException(ErrorCode.E0420, filter, XLog.format( 342 "invalid status [{0}]", pair[1])); 343 } 344 } 345 List<String> list = map.get(pair[0]); 346 if (list == null) { 347 list = new ArrayList<String>(); 348 map.put(pair[0], list); 349 } 350 list.add(pair[1]); 351 } 352 else { 353 throw new BundleEngineException(ErrorCode.E0420, filter, "elements must be name=value pairs"); 354 } 355 } 356 } 357 return map; 358 } 359 360 }