This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie;
019
020 import java.io.IOException;
021 import java.io.Writer;
022 import java.text.ParseException;
023 import java.util.ArrayList;
024 import java.util.Date;
025 import java.util.HashMap;
026 import java.util.HashSet;
027 import java.util.List;
028 import java.util.Map;
029 import java.util.Set;
030 import java.util.StringTokenizer;
031
032 import org.apache.hadoop.conf.Configuration;
033 import org.apache.oozie.client.CoordinatorAction;
034 import org.apache.oozie.client.CoordinatorJob;
035 import org.apache.oozie.client.Job;
036 import org.apache.oozie.client.OozieClient;
037 import org.apache.oozie.client.WorkflowJob;
038 import org.apache.oozie.client.rest.BulkResponseImpl;
039 import org.apache.oozie.command.BulkJobsXCommand;
040 import org.apache.oozie.command.CommandException;
041 import org.apache.oozie.command.bundle.BundleJobChangeXCommand;
042 import org.apache.oozie.command.bundle.BundleJobResumeXCommand;
043 import org.apache.oozie.command.bundle.BundleJobSuspendXCommand;
044 import org.apache.oozie.command.bundle.BundleJobXCommand;
045 import org.apache.oozie.command.bundle.BundleJobsXCommand;
046 import org.apache.oozie.command.bundle.BundleKillXCommand;
047 import org.apache.oozie.command.bundle.BundleRerunXCommand;
048 import org.apache.oozie.command.bundle.BundleStartXCommand;
049 import org.apache.oozie.command.bundle.BundleSubmitXCommand;
050 import org.apache.oozie.service.DagXLogInfoService;
051 import org.apache.oozie.service.Services;
052 import org.apache.oozie.service.XLogService;
053 import org.apache.oozie.util.DateUtils;
054 import org.apache.oozie.util.ParamChecker;
055 import org.apache.oozie.util.XLog;
056 import org.apache.oozie.util.XLogStreamer;
057
058 public class BundleEngine extends BaseEngine {
059 /**
060 * Create a system Bundle engine, with no user and no group.
061 */
062 public BundleEngine() {
063 }
064
065 /**
066 * Create a Bundle engine to perform operations on behave of a user.
067 *
068 * @param user user name.
069 * @param authToken the authentication token.
070 */
071 public BundleEngine(String user, String authToken) {
072 this.user = ParamChecker.notEmpty(user, "user");
073 this.authToken = ParamChecker.notEmpty(authToken, "authToken");
074 }
075
076 /* (non-Javadoc)
077 * @see org.apache.oozie.BaseEngine#change(java.lang.String, java.lang.String)
078 */
079 @Override
080 public void change(String jobId, String changeValue) throws BundleEngineException {
081 try {
082 BundleJobChangeXCommand change = new BundleJobChangeXCommand(jobId, changeValue);
083 change.call();
084 }
085 catch (CommandException ex) {
086 throw new BundleEngineException(ex);
087 }
088 }
089
090 /* (non-Javadoc)
091 * @see org.apache.oozie.BaseEngine#dryRunSubmit(org.apache.hadoop.conf.Configuration)
092 */
093 @Override
094 public String dryRunSubmit(Configuration conf) throws BundleEngineException {
095 BundleSubmitXCommand submit = new BundleSubmitXCommand(true, conf, getAuthToken());
096 try {
097 String jobId = submit.call();
098 return jobId;
099 }
100 catch (CommandException ex) {
101 throw new BundleEngineException(ex);
102 }
103 }
104
105 /* (non-Javadoc)
106 * @see org.apache.oozie.BaseEngine#getCoordJob(java.lang.String)
107 */
108 @Override
109 public CoordinatorJob getCoordJob(String jobId) throws BundleEngineException {
110 throw new BundleEngineException(new XException(ErrorCode.E0301, "cannot get a coordinator job from BundleEngine"));
111 }
112
113 public BundleJobBean getBundleJob(String jobId) throws BundleEngineException {
114 try {
115 return new BundleJobXCommand(jobId).call();
116 }
117 catch (CommandException ex) {
118 throw new BundleEngineException(ex);
119 }
120 }
121
122 /* (non-Javadoc)
123 * @see org.apache.oozie.BaseEngine#getCoordJob(java.lang.String, int, int)
124 */
125 @Override
126 public CoordinatorJob getCoordJob(String jobId, String filter, int start, int length) throws BundleEngineException {
127 throw new BundleEngineException(new XException(ErrorCode.E0301, "cannot get a coordinator job from BundleEngine"));
128 }
129
130 /* (non-Javadoc)
131 * @see org.apache.oozie.BaseEngine#getDefinition(java.lang.String)
132 */
133 @Override
134 public String getDefinition(String jobId) throws BundleEngineException {
135 BundleJobBean job;
136 try {
137 job = new BundleJobXCommand(jobId).call();
138 }
139 catch (CommandException ex) {
140 throw new BundleEngineException(ex);
141 }
142 return job.getOrigJobXml();
143 }
144
145 /* (non-Javadoc)
146 * @see org.apache.oozie.BaseEngine#getJob(java.lang.String)
147 */
148 @Override
149 public WorkflowJob getJob(String jobId) throws BundleEngineException {
150 throw new BundleEngineException(new XException(ErrorCode.E0301, "cannot get a workflow job from BundleEngine"));
151 }
152
153 /* (non-Javadoc)
154 * @see org.apache.oozie.BaseEngine#getJob(java.lang.String, int, int)
155 */
156 @Override
157 public WorkflowJob getJob(String jobId, int start, int length) throws BundleEngineException {
158 throw new BundleEngineException(new XException(ErrorCode.E0301, "cannot get a workflow job from BundleEngine"));
159 }
160
161 /* (non-Javadoc)
162 * @see org.apache.oozie.BaseEngine#getJobIdForExternalId(java.lang.String)
163 */
164 @Override
165 public String getJobIdForExternalId(String externalId) throws BundleEngineException {
166 return null;
167 }
168
169 /* (non-Javadoc)
170 * @see org.apache.oozie.BaseEngine#kill(java.lang.String)
171 */
172 @Override
173 public void kill(String jobId) throws BundleEngineException {
174 try {
175 new BundleKillXCommand(jobId).call();
176 }
177 catch (CommandException e) {
178 throw new BundleEngineException(e);
179 }
180 }
181
182 /* (non-Javadoc)
183 * @see org.apache.oozie.BaseEngine#reRun(java.lang.String, org.apache.hadoop.conf.Configuration)
184 */
185 @Override
186 @Deprecated
187 public void reRun(String jobId, Configuration conf) throws BundleEngineException {
188 throw new BundleEngineException(new XException(ErrorCode.E0301, "rerun"));
189 }
190
191 /**
192 * Rerun Bundle actions for given rerunType
193 *
194 * @param jobId bundle job id
195 * @param coordScope the rerun scope for coordinator job names separated by ","
196 * @param dateScope the rerun scope for coordinator nominal times separated by ","
197 * @param refresh true if user wants to refresh input/outpur dataset urls
198 * @param noCleanup false if user wants to cleanup output events for given rerun actions
199 * @throws BaseEngineException thrown if failed to rerun
200 */
201 public void reRun(String jobId, String coordScope, String dateScope, boolean refresh, boolean noCleanup)
202 throws BaseEngineException {
203 try {
204 new BundleRerunXCommand(jobId, coordScope, dateScope, refresh, noCleanup).call();
205 }
206 catch (CommandException ex) {
207 throw new BaseEngineException(ex);
208 }
209 }
210
211 /* (non-Javadoc)
212 * @see org.apache.oozie.BaseEngine#resume(java.lang.String)
213 */
214 @Override
215 public void resume(String jobId) throws BundleEngineException {
216 BundleJobResumeXCommand resume = new BundleJobResumeXCommand(jobId);
217 try {
218 resume.call();
219 }
220 catch (CommandException ex) {
221 throw new BundleEngineException(ex);
222 }
223 }
224
225 /* (non-Javadoc)
226 * @see org.apache.oozie.BaseEngine#start(java.lang.String)
227 */
228 @Override
229 public void start(String jobId) throws BundleEngineException {
230 try {
231 new BundleStartXCommand(jobId).call();
232 }
233 catch (CommandException e) {
234 throw new BundleEngineException(e);
235 }
236 }
237
238 /* (non-Javadoc)
239 * @see org.apache.oozie.BaseEngine#streamLog(java.lang.String, java.io.Writer)
240 */
241 @Override
242 public void streamLog(String jobId, Writer writer) throws IOException, BundleEngineException {
243 XLogStreamer.Filter filter = new XLogStreamer.Filter();
244 filter.setParameter(DagXLogInfoService.JOB, jobId);
245
246 BundleJobBean job;
247 try {
248 job = new BundleJobXCommand(jobId).call();
249 }
250 catch (CommandException ex) {
251 throw new BundleEngineException(ex);
252 }
253
254 Services.get().get(XLogService.class).streamLog(filter, job.getCreatedTime(), new Date(), writer);
255 }
256
257 /* (non-Javadoc)
258 * @see org.apache.oozie.BaseEngine#submitJob(org.apache.hadoop.conf.Configuration, boolean)
259 */
260 @Override
261 public String submitJob(Configuration conf, boolean startJob) throws BundleEngineException {
262 try {
263 String jobId = new BundleSubmitXCommand(conf, getAuthToken()).call();
264
265 if (startJob) {
266 start(jobId);
267 }
268 return jobId;
269 }
270 catch (CommandException ex) {
271 throw new BundleEngineException(ex);
272 }
273 }
274
275 /* (non-Javadoc)
276 * @see org.apache.oozie.BaseEngine#suspend(java.lang.String)
277 */
278 @Override
279 public void suspend(String jobId) throws BundleEngineException {
280 BundleJobSuspendXCommand suspend = new BundleJobSuspendXCommand(jobId);
281 try {
282 suspend.call();
283 }
284 catch (CommandException ex) {
285 throw new BundleEngineException(ex);
286 }
287 }
288
289 private static final Set<String> FILTER_NAMES = new HashSet<String>();
290
291 static {
292 FILTER_NAMES.add(OozieClient.FILTER_USER);
293 FILTER_NAMES.add(OozieClient.FILTER_NAME);
294 FILTER_NAMES.add(OozieClient.FILTER_GROUP);
295 FILTER_NAMES.add(OozieClient.FILTER_STATUS);
296 FILTER_NAMES.add(OozieClient.FILTER_ID);
297 }
298
299 /**
300 * Get bundle jobs
301 *
302 * @param filter the filter string
303 * @param start start location for paging
304 * @param len total length to get
305 * @return bundle job info
306 * @throws BundleEngineException thrown if failed to get bundle job info
307 */
308 public BundleJobInfo getBundleJobs(String filter, int start, int len) throws BundleEngineException {
309 Map<String, List<String>> filterList = parseFilter(filter);
310
311 try {
312 return new BundleJobsXCommand(filterList, start, len).call();
313 }
314 catch (CommandException ex) {
315 throw new BundleEngineException(ex);
316 }
317 }
318
319 /**
320 * Parse filter string to a map with key = filter name and values = filter values
321 *
322 * @param filter the filter string
323 * @return filter key and value map
324 * @throws CoordinatorEngineException thrown if failed to parse filter string
325 */
326 private Map<String, List<String>> parseFilter(String filter) throws BundleEngineException {
327 Map<String, List<String>> map = new HashMap<String, List<String>>();
328 if (filter != null) {
329 StringTokenizer st = new StringTokenizer(filter, ";");
330 while (st.hasMoreTokens()) {
331 String token = st.nextToken();
332 if (token.contains("=")) {
333 String[] pair = token.split("=");
334 if (pair.length != 2) {
335 throw new BundleEngineException(ErrorCode.E0420, filter, "elements must be name=value pairs");
336 }
337 if (!FILTER_NAMES.contains(pair[0])) {
338 throw new BundleEngineException(ErrorCode.E0420, filter, XLog.format("invalid name [{0}]",
339 pair[0]));
340 }
341 if (pair[0].equals("status")) {
342 try {
343 Job.Status.valueOf(pair[1]);
344 }
345 catch (IllegalArgumentException ex) {
346 throw new BundleEngineException(ErrorCode.E0420, filter, XLog.format(
347 "invalid status [{0}]", pair[1]));
348 }
349 }
350 List<String> list = map.get(pair[0]);
351 if (list == null) {
352 list = new ArrayList<String>();
353 map.put(pair[0], list);
354 }
355 list.add(pair[1]);
356 }
357 else {
358 throw new BundleEngineException(ErrorCode.E0420, filter, "elements must be name=value pairs");
359 }
360 }
361 }
362 return map;
363 }
364
365 /**
366 * Get bulk job response
367 *
368 * @param filter the filter string
369 * @param start start location for paging
370 * @param len total length to get
371 * @return bulk job info
372 * @throws BundleEngineException thrown if failed to get bulk job info
373 */
374 public BulkResponseInfo getBulkJobs(String bulkFilter, int start, int len) throws BundleEngineException {
375 Map<String,List<String>> bulkRequestMap = parseBulkFilter(bulkFilter);
376 try {
377 return new BulkJobsXCommand(bulkRequestMap, start, len).call();
378 }
379 catch (CommandException ex) {
380 throw new BundleEngineException(ex);
381 }
382 }
383
384 /**
385 * Parse filter string to a map with key = filter name and values = filter values
386 * Allowed keys are defined as constants on top
387 *
388 * @param filter the filter string
389 * @return filter key-value pair map
390 * @throws BundleEngineException thrown if failed to parse filter string
391 */
392 public static Map<String,List<String>> parseBulkFilter(String bulkParams) throws BundleEngineException {
393
394 Map<String,List<String>> bulkFilter = new HashMap<String,List<String>>();
395 // Functionality can be extended to different job levels - TODO extend filter parser and query
396 // E.g. String filterlevel = "coordinatoraction"; BulkResponseImpl.BULK_FILTER_LEVEL
397 if (bulkFilter != null) {
398 StringTokenizer st = new StringTokenizer(bulkParams, ";");
399 while (st.hasMoreTokens()) {
400 String token = st.nextToken();
401 if (token.contains("=")) {
402 String[] pair = token.split("=");
403 if (pair.length != 2) {
404 throw new BundleEngineException(ErrorCode.E0420, token,
405 "elements must be name=value pairs");
406 }
407 pair[0] = pair[0].toLowerCase();
408 String[] values = pair[1].split(",");
409 if (!BulkResponseImpl.BULK_FILTER_NAMES.contains(pair[0])) {
410 throw new BundleEngineException(ErrorCode.E0420, token, XLog.format("invalid parameter name [{0}]",
411 pair[0]));
412 }
413 // special check and processing for time related params
414 if (pair[0].contains("time")) {
415 try {
416 DateUtils.parseDateUTC(pair[1]);
417 }
418 catch (ParseException e) {
419 throw new BundleEngineException(ErrorCode.E0420, token, XLog.format(
420 "invalid value [{0}] for time. A datetime value of pattern [{1}] is expected", pair[1],
421 DateUtils.ISO8601_UTC_MASK));
422 }
423 }
424 // special check for action status param
425 // TODO: when extended for levels other than coord action, check against corresponding level's Status values
426 if (pair[0].equals(BulkResponseImpl.BULK_FILTER_STATUS)) {
427 for(String value : values) {
428 try {
429 CoordinatorAction.Status.valueOf(value);
430 }
431 catch (IllegalArgumentException ex) {
432 throw new BundleEngineException(ErrorCode.E0420, token, XLog.format(
433 "invalid action status [{0}]", value));
434 }
435 }
436 }
437 // eventually adding into map for all cases e.g. names, times, status
438 List<String> list = bulkFilter.get(pair[0]);
439 if (list == null) {
440 list = new ArrayList<String>();
441 bulkFilter.put(pair[0], list);
442 }
443 for(String value : values) {
444 value = value.trim();
445 if(value.isEmpty()) {
446 throw new BundleEngineException(ErrorCode.E0420, token, "value is empty or whitespace");
447 }
448 list.add(value);
449 }
450 } else {
451 throw new BundleEngineException(ErrorCode.E0420, token, "elements must be name=value pairs");
452 }
453 }
454 if(!bulkFilter.containsKey(BulkResponseImpl.BULK_FILTER_BUNDLE_NAME)) {
455 throw new BundleEngineException(ErrorCode.E0305, BulkResponseImpl.BULK_FILTER_BUNDLE_NAME);
456 }
457 }
458 return bulkFilter;
459 }
460 }