This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie;
019
020 import java.io.IOException;
021 import java.io.Writer;
022 import java.text.ParseException;
023 import java.util.ArrayList;
024 import java.util.Date;
025 import java.util.HashMap;
026 import java.util.HashSet;
027 import java.util.List;
028 import java.util.Map;
029 import java.util.Set;
030 import java.util.StringTokenizer;
031
032 import org.apache.hadoop.conf.Configuration;
033 import org.apache.oozie.client.CoordinatorAction;
034 import org.apache.oozie.client.CoordinatorJob;
035 import org.apache.oozie.client.Job;
036 import org.apache.oozie.client.OozieClient;
037 import org.apache.oozie.client.WorkflowJob;
038 import org.apache.oozie.client.rest.BulkResponseImpl;
039 import org.apache.oozie.command.BulkJobsXCommand;
040 import org.apache.oozie.command.CommandException;
041 import org.apache.oozie.command.bundle.BundleJobChangeXCommand;
042 import org.apache.oozie.command.bundle.BundleJobResumeXCommand;
043 import org.apache.oozie.command.bundle.BundleJobSuspendXCommand;
044 import org.apache.oozie.command.bundle.BundleJobXCommand;
045 import org.apache.oozie.command.bundle.BundleJobsXCommand;
046 import org.apache.oozie.command.bundle.BundleKillXCommand;
047 import org.apache.oozie.command.bundle.BundleRerunXCommand;
048 import org.apache.oozie.command.bundle.BundleStartXCommand;
049 import org.apache.oozie.command.bundle.BundleSubmitXCommand;
050 import org.apache.oozie.service.DagXLogInfoService;
051 import org.apache.oozie.service.Services;
052 import org.apache.oozie.service.XLogService;
053 import org.apache.oozie.util.DateUtils;
054 import org.apache.oozie.util.ParamChecker;
055 import org.apache.oozie.util.XLog;
056 import org.apache.oozie.util.XLogStreamer;
057
058 import com.google.common.annotations.VisibleForTesting;
059
060 public class BundleEngine extends BaseEngine {
061 /**
062 * Create a system Bundle engine, with no user and no group.
063 */
064 public BundleEngine() {
065 }
066
067 /**
068 * Create a Bundle engine to perform operations on behave of a user.
069 *
070 * @param user user name.
071 */
072 public BundleEngine(String user) {
073 this.user = ParamChecker.notEmpty(user, "user");
074 }
075
076 /* (non-Javadoc)
077 * @see org.apache.oozie.BaseEngine#change(java.lang.String, java.lang.String)
078 */
079 @Override
080 public void change(String jobId, String changeValue) throws BundleEngineException {
081 try {
082 BundleJobChangeXCommand change = new BundleJobChangeXCommand(jobId, changeValue);
083 change.call();
084 }
085 catch (CommandException ex) {
086 throw new BundleEngineException(ex);
087 }
088 }
089
090 /* (non-Javadoc)
091 * @see org.apache.oozie.BaseEngine#dryRunSubmit(org.apache.hadoop.conf.Configuration)
092 */
093 @Override
094 public String dryRunSubmit(Configuration conf) throws BundleEngineException {
095 BundleSubmitXCommand submit = new BundleSubmitXCommand(true, conf);
096 try {
097 String jobId = submit.call();
098 return jobId;
099 }
100 catch (CommandException ex) {
101 throw new BundleEngineException(ex);
102 }
103 }
104
105 /* (non-Javadoc)
106 * @see org.apache.oozie.BaseEngine#getCoordJob(java.lang.String)
107 */
108 @Override
109 public CoordinatorJob getCoordJob(String jobId) throws BundleEngineException {
110 throw new BundleEngineException(new XException(ErrorCode.E0301, "cannot get a coordinator job from BundleEngine"));
111 }
112
113 public BundleJobBean getBundleJob(String jobId) throws BundleEngineException {
114 try {
115 return new BundleJobXCommand(jobId).call();
116 }
117 catch (CommandException ex) {
118 throw new BundleEngineException(ex);
119 }
120 }
121
122 /* (non-Javadoc)
123 * @see org.apache.oozie.BaseEngine#getCoordJob(java.lang.String, int, int)
124 */
125 @Override
126 public CoordinatorJob getCoordJob(String jobId, String filter, int start, int length, boolean desc)
127 throws BundleEngineException {
128 throw new BundleEngineException(new XException(ErrorCode.E0301,
129 "cannot get a coordinator job from BundleEngine"));
130 }
131
132 /* (non-Javadoc)
133 * @see org.apache.oozie.BaseEngine#getDefinition(java.lang.String)
134 */
135 @Override
136 public String getDefinition(String jobId) throws BundleEngineException {
137 BundleJobBean job;
138 try {
139 job = new BundleJobXCommand(jobId).call();
140 }
141 catch (CommandException ex) {
142 throw new BundleEngineException(ex);
143 }
144 return job.getOrigJobXml();
145 }
146
147 /* (non-Javadoc)
148 * @see org.apache.oozie.BaseEngine#getJob(java.lang.String)
149 */
150 @Override
151 public WorkflowJob getJob(String jobId) throws BundleEngineException {
152 throw new BundleEngineException(new XException(ErrorCode.E0301, "cannot get a workflow job from BundleEngine"));
153 }
154
155 /* (non-Javadoc)
156 * @see org.apache.oozie.BaseEngine#getJob(java.lang.String, int, int)
157 */
158 @Override
159 public WorkflowJob getJob(String jobId, int start, int length) throws BundleEngineException {
160 throw new BundleEngineException(new XException(ErrorCode.E0301, "cannot get a workflow job from BundleEngine"));
161 }
162
163 /* (non-Javadoc)
164 * @see org.apache.oozie.BaseEngine#getJobIdForExternalId(java.lang.String)
165 */
166 @Override
167 public String getJobIdForExternalId(String externalId) throws BundleEngineException {
168 return null;
169 }
170
171 /* (non-Javadoc)
172 * @see org.apache.oozie.BaseEngine#kill(java.lang.String)
173 */
174 @Override
175 public void kill(String jobId) throws BundleEngineException {
176 try {
177 new BundleKillXCommand(jobId).call();
178 }
179 catch (CommandException e) {
180 throw new BundleEngineException(e);
181 }
182 }
183
184 /* (non-Javadoc)
185 * @see org.apache.oozie.BaseEngine#reRun(java.lang.String, org.apache.hadoop.conf.Configuration)
186 */
187 @Override
188 @Deprecated
189 public void reRun(String jobId, Configuration conf) throws BundleEngineException {
190 throw new BundleEngineException(new XException(ErrorCode.E0301, "rerun"));
191 }
192
193 /**
194 * Rerun Bundle actions for given rerunType
195 *
196 * @param jobId bundle job id
197 * @param coordScope the rerun scope for coordinator job names separated by ","
198 * @param dateScope the rerun scope for coordinator nominal times separated by ","
199 * @param refresh true if user wants to refresh input/outpur dataset urls
200 * @param noCleanup false if user wants to cleanup output events for given rerun actions
201 * @throws BaseEngineException thrown if failed to rerun
202 */
203 public void reRun(String jobId, String coordScope, String dateScope, boolean refresh, boolean noCleanup)
204 throws BaseEngineException {
205 try {
206 new BundleRerunXCommand(jobId, coordScope, dateScope, refresh, noCleanup).call();
207 }
208 catch (CommandException ex) {
209 throw new BaseEngineException(ex);
210 }
211 }
212
213 /* (non-Javadoc)
214 * @see org.apache.oozie.BaseEngine#resume(java.lang.String)
215 */
216 @Override
217 public void resume(String jobId) throws BundleEngineException {
218 BundleJobResumeXCommand resume = new BundleJobResumeXCommand(jobId);
219 try {
220 resume.call();
221 }
222 catch (CommandException ex) {
223 throw new BundleEngineException(ex);
224 }
225 }
226
227 /* (non-Javadoc)
228 * @see org.apache.oozie.BaseEngine#start(java.lang.String)
229 */
230 @Override
231 public void start(String jobId) throws BundleEngineException {
232 try {
233 new BundleStartXCommand(jobId).call();
234 }
235 catch (CommandException e) {
236 throw new BundleEngineException(e);
237 }
238 }
239
240 /* (non-Javadoc)
241 * @see org.apache.oozie.BaseEngine#streamLog(java.lang.String, java.io.Writer)
242 */
243 @Override
244 public void streamLog(String jobId, Writer writer) throws IOException, BundleEngineException {
245 XLogStreamer.Filter filter = new XLogStreamer.Filter();
246 filter.setParameter(DagXLogInfoService.JOB, jobId);
247
248 BundleJobBean job;
249 try {
250 job = new BundleJobXCommand(jobId).call();
251 }
252 catch (CommandException ex) {
253 throw new BundleEngineException(ex);
254 }
255
256 Services.get().get(XLogService.class).streamLog(filter, job.getCreatedTime(), new Date(), writer);
257 }
258
259 /* (non-Javadoc)
260 * @see org.apache.oozie.BaseEngine#submitJob(org.apache.hadoop.conf.Configuration, boolean)
261 */
262 @Override
263 public String submitJob(Configuration conf, boolean startJob) throws BundleEngineException {
264 try {
265 String jobId = new BundleSubmitXCommand(conf).call();
266
267 if (startJob) {
268 start(jobId);
269 }
270 return jobId;
271 }
272 catch (CommandException ex) {
273 throw new BundleEngineException(ex);
274 }
275 }
276
277 /* (non-Javadoc)
278 * @see org.apache.oozie.BaseEngine#suspend(java.lang.String)
279 */
280 @Override
281 public void suspend(String jobId) throws BundleEngineException {
282 BundleJobSuspendXCommand suspend = new BundleJobSuspendXCommand(jobId);
283 try {
284 suspend.call();
285 }
286 catch (CommandException ex) {
287 throw new BundleEngineException(ex);
288 }
289 }
290
291 private static final Set<String> FILTER_NAMES = new HashSet<String>();
292
293 static {
294 FILTER_NAMES.add(OozieClient.FILTER_USER);
295 FILTER_NAMES.add(OozieClient.FILTER_NAME);
296 FILTER_NAMES.add(OozieClient.FILTER_GROUP);
297 FILTER_NAMES.add(OozieClient.FILTER_STATUS);
298 FILTER_NAMES.add(OozieClient.FILTER_ID);
299 }
300
301 /**
302 * Get bundle jobs
303 *
304 * @param filter the filter string
305 * @param start start location for paging
306 * @param len total length to get
307 * @return bundle job info
308 * @throws BundleEngineException thrown if failed to get bundle job info
309 */
310 public BundleJobInfo getBundleJobs(String filter, int start, int len) throws BundleEngineException {
311 Map<String, List<String>> filterList = parseFilter(filter);
312
313 try {
314 return new BundleJobsXCommand(filterList, start, len).call();
315 }
316 catch (CommandException ex) {
317 throw new BundleEngineException(ex);
318 }
319 }
320
321 /**
322 * Parse filter string to a map with key = filter name and values = filter values
323 *
324 * @param filter the filter string
325 * @return filter key and value map
326 * @throws CoordinatorEngineException thrown if failed to parse filter string
327 */
328 @VisibleForTesting
329 Map<String, List<String>> parseFilter(String filter) throws BundleEngineException {
330 Map<String, List<String>> map = new HashMap<String, List<String>>();
331 if (filter != null) {
332 StringTokenizer st = new StringTokenizer(filter, ";");
333 while (st.hasMoreTokens()) {
334 String token = st.nextToken();
335 if (token.contains("=")) {
336 String[] pair = token.split("=");
337 if (pair.length != 2) {
338 throw new BundleEngineException(ErrorCode.E0420, filter, "elements must be name=value pairs");
339 }
340 if (!FILTER_NAMES.contains(pair[0])) {
341 throw new BundleEngineException(ErrorCode.E0420, filter, XLog.format("invalid name [{0}]",
342 pair[0]));
343 }
344 if (pair[0].equals("status")) {
345 try {
346 Job.Status.valueOf(pair[1]);
347 }
348 catch (IllegalArgumentException ex) {
349 throw new BundleEngineException(ErrorCode.E0420, filter, XLog.format(
350 "invalid status [{0}]", pair[1]));
351 }
352 }
353 List<String> list = map.get(pair[0]);
354 if (list == null) {
355 list = new ArrayList<String>();
356 map.put(pair[0], list);
357 }
358 list.add(pair[1]);
359 }
360 else {
361 throw new BundleEngineException(ErrorCode.E0420, filter, "elements must be name=value pairs");
362 }
363 }
364 }
365 return map;
366 }
367
368 /**
369 * Get bulk job response
370 *
371 * @param filter the filter string
372 * @param start start location for paging
373 * @param len total length to get
374 * @return bulk job info
375 * @throws BundleEngineException thrown if failed to get bulk job info
376 */
377 public BulkResponseInfo getBulkJobs(String bulkFilter, int start, int len) throws BundleEngineException {
378 Map<String,List<String>> bulkRequestMap = parseBulkFilter(bulkFilter);
379 try {
380 return new BulkJobsXCommand(bulkRequestMap, start, len).call();
381 }
382 catch (CommandException ex) {
383 throw new BundleEngineException(ex);
384 }
385 }
386
387 /**
388 * Parse filter string to a map with key = filter name and values = filter values
389 * Allowed keys are defined as constants on top
390 *
391 * @param filter the filter string
392 * @return filter key-value pair map
393 * @throws BundleEngineException thrown if failed to parse filter string
394 */
395 public static Map<String,List<String>> parseBulkFilter(String bulkParams) throws BundleEngineException {
396
397 Map<String,List<String>> bulkFilter = new HashMap<String,List<String>>();
398 // Functionality can be extended to different job levels - TODO extend filter parser and query
399 // E.g. String filterlevel = "coordinatoraction"; BulkResponseImpl.BULK_FILTER_LEVEL
400 if (bulkFilter != null) {
401 StringTokenizer st = new StringTokenizer(bulkParams, ";");
402 while (st.hasMoreTokens()) {
403 String token = st.nextToken();
404 if (token.contains("=")) {
405 String[] pair = token.split("=");
406 if (pair.length != 2) {
407 throw new BundleEngineException(ErrorCode.E0420, token,
408 "elements must be name=value pairs");
409 }
410 pair[0] = pair[0].toLowerCase();
411 String[] values = pair[1].split(",");
412 if (!BulkResponseImpl.BULK_FILTER_NAMES.contains(pair[0])) {
413 throw new BundleEngineException(ErrorCode.E0420, token, XLog.format("invalid parameter name [{0}]",
414 pair[0]));
415 }
416 // special check and processing for time related params
417 if (pair[0].contains("time")) {
418 try {
419 DateUtils.parseDateUTC(pair[1]);
420 }
421 catch (ParseException e) {
422 throw new BundleEngineException(ErrorCode.E0420, token, XLog.format(
423 "invalid value [{0}] for time. A datetime value of pattern [{1}] is expected", pair[1],
424 DateUtils.ISO8601_UTC_MASK));
425 }
426 }
427 // special check for action status param
428 // TODO: when extended for levels other than coord action, check against corresponding level's Status values
429 if (pair[0].equals(BulkResponseImpl.BULK_FILTER_STATUS)) {
430 for(String value : values) {
431 try {
432 CoordinatorAction.Status.valueOf(value);
433 }
434 catch (IllegalArgumentException ex) {
435 throw new BundleEngineException(ErrorCode.E0420, token, XLog.format(
436 "invalid action status [{0}]", value));
437 }
438 }
439 }
440 // eventually adding into map for all cases e.g. names, times, status
441 List<String> list = bulkFilter.get(pair[0]);
442 if (list == null) {
443 list = new ArrayList<String>();
444 bulkFilter.put(pair[0], list);
445 }
446 for(String value : values) {
447 value = value.trim();
448 if(value.isEmpty()) {
449 throw new BundleEngineException(ErrorCode.E0420, token, "value is empty or whitespace");
450 }
451 list.add(value);
452 }
453 } else {
454 throw new BundleEngineException(ErrorCode.E0420, token, "elements must be name=value pairs");
455 }
456 }
457 if(!bulkFilter.containsKey(BulkResponseImpl.BULK_FILTER_BUNDLE_NAME)) {
458 throw new BundleEngineException(ErrorCode.E0305, BulkResponseImpl.BULK_FILTER_BUNDLE_NAME);
459 }
460 }
461 return bulkFilter;
462 }
463 }