This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie;
019
020 import java.io.DataInput;
021 import java.io.DataOutput;
022 import java.io.IOException;
023 import java.sql.Timestamp;
024 import java.util.Date;
025 import java.util.Properties;
026
027 import javax.persistence.Basic;
028 import javax.persistence.Column;
029 import javax.persistence.Entity;
030 import javax.persistence.Lob;
031 import javax.persistence.NamedQueries;
032 import javax.persistence.NamedQuery;
033 import javax.persistence.Transient;
034
035 import org.apache.hadoop.io.Writable;
036 import org.apache.oozie.client.WorkflowAction;
037 import org.apache.oozie.client.rest.JsonWorkflowAction;
038 import org.apache.oozie.util.DateUtils;
039 import org.apache.oozie.util.ParamChecker;
040 import org.apache.oozie.util.PropertiesUtils;
041 import org.apache.oozie.util.WritableUtils;
042 import org.apache.openjpa.persistence.jdbc.Index;
043
044 /**
045 * Bean that contains all the information to start an action for a workflow node.
046 */
047 @Entity
048 @NamedQueries({
049
050 @NamedQuery(name = "UPDATE_ACTION", query = "update WorkflowActionBean a set a.conf = :conf, a.consoleUrl = :consoleUrl, a.data = :data, a.stats = :stats, a.externalChildIDs = :externalChildIDs, a.errorCode = :errorCode, a.errorMessage = :errorMessage, a.externalId = :externalId, a.externalStatus = :externalStatus, a.name = :name, a.cred = :cred , a.retries = :retries, a.trackerUri = :trackerUri, a.transition = :transition, a.type = :type, a.endTimestamp = :endTime, a.executionPath = :executionPath, a.lastCheckTimestamp = :lastCheckTime, a.logToken = :logToken, a.pending = :pending, a.pendingAgeTimestamp = :pendingAge, a.signalValue = :signalValue, a.slaXml = :slaXml, a.startTimestamp = :startTime, a.status = :status, a.wfId=:wfId where a.id = :id"),
051
052 @NamedQuery(name = "DELETE_ACTION", query = "delete from WorkflowActionBean a where a.id = :id"),
053
054 @NamedQuery(name = "DELETE_ACTIONS_FOR_WORKFLOW", query = "delete from WorkflowActionBean a where a.wfId = :wfId"),
055
056 @NamedQuery(name = "GET_ACTIONS", query = "select OBJECT(a) from WorkflowActionBean a"),
057
058 @NamedQuery(name = "GET_ACTION", query = "select OBJECT(a) from WorkflowActionBean a where a.id = :id"),
059
060 @NamedQuery(name = "GET_ACTION_FOR_UPDATE", query = "select OBJECT(a) from WorkflowActionBean a where a.id = :id"),
061
062 @NamedQuery(name = "GET_ACTIONS_FOR_WORKFLOW", query = "select OBJECT(a) from WorkflowActionBean a where a.wfId = :wfId order by a.startTimestamp"),
063
064 @NamedQuery(name = "GET_ACTIONS_OF_WORKFLOW_FOR_UPDATE", query = "select OBJECT(a) from WorkflowActionBean a where a.wfId = :wfId order by a.startTimestamp"),
065
066 @NamedQuery(name = "GET_PENDING_ACTIONS", query = "select OBJECT(a) from WorkflowActionBean a where a.pending = 1 AND a.pendingAgeTimestamp < :pendingAge AND a.status <> 'RUNNING'"),
067
068 @NamedQuery(name = "GET_RUNNING_ACTIONS", query = "select OBJECT(a) from WorkflowActionBean a where a.pending = 1 AND a.status = 'RUNNING' AND a.lastCheckTimestamp < :lastCheckTime"),
069
070 @NamedQuery(name = "GET_RETRY_MANUAL_ACTIONS", query = "select OBJECT(a) from WorkflowActionBean a where a.wfId = :wfId AND (a.status = 'START_RETRY' OR a.status = 'START_MANUAL' OR a.status = 'END_RETRY' OR a.status = 'END_MANUAL')") })
071
072 public class WorkflowActionBean extends JsonWorkflowAction implements Writable {
073
074 @Basic
075 @Index
076 @Column(name = "wf_id")
077 private String wfId = null;
078
079 @Basic
080 @Index
081 @Column(name = "status")
082 private String status = WorkflowAction.Status.PREP.toString();
083
084 @Basic
085 @Column(name = "last_check_time")
086 private java.sql.Timestamp lastCheckTimestamp;
087
088 @Basic
089 @Column(name = "end_time")
090 private java.sql.Timestamp endTimestamp = null;
091
092 @Basic
093 @Column(name = "start_time")
094 private java.sql.Timestamp startTimestamp = null;
095
096 @Basic
097 @Column(name = "execution_path", length = 1024)
098 private String executionPath = null;
099
100 @Basic
101 @Column(name = "pending")
102 private int pending = 0;
103
104 // @Temporal(TemporalType.TIME)
105 // @Column(name="pending_age",columnDefinition="timestamp default '0000-00-00 00:00:00'")
106 @Basic
107 @Index
108 @Column(name = "pending_age")
109 private java.sql.Timestamp pendingAgeTimestamp = null;
110
111 @Basic
112 @Column(name = "signal_value")
113 private String signalValue = null;
114
115 @Basic
116 @Column(name = "log_token")
117 private String logToken = null;
118
119 @Transient
120 private Date pendingAge;
121
122 @Column(name = "sla_xml")
123 @Lob
124 private String slaXml = null;
125
126 /**
127 * Default constructor.
128 */
129 public WorkflowActionBean() {
130 }
131
132 /**
133 * Serialize the action bean to a data output.
134 *
135 * @param dataOutput data output.
136 * @throws IOException thrown if the action bean could not be serialized.
137 */
138
139 public void write(DataOutput dataOutput) throws IOException {
140 WritableUtils.writeStr(dataOutput, getId());
141 WritableUtils.writeStr(dataOutput, getName());
142 WritableUtils.writeStr(dataOutput, getCred());
143 WritableUtils.writeStr(dataOutput, getType());
144 WritableUtils.writeStr(dataOutput, getConf());
145 WritableUtils.writeStr(dataOutput, getStatusStr());
146 dataOutput.writeInt(getRetries());
147 dataOutput.writeLong((getStartTime() != null) ? getStartTime().getTime() : -1);
148 dataOutput.writeLong((getEndTime() != null) ? getEndTime().getTime() : -1);
149 dataOutput.writeLong((getLastCheckTime() != null) ? getLastCheckTime().getTime() : -1);
150 WritableUtils.writeStr(dataOutput, getTransition());
151 WritableUtils.writeStr(dataOutput, getData());
152 WritableUtils.writeStr(dataOutput, getStats());
153 WritableUtils.writeStr(dataOutput, getExternalChildIDs());
154 WritableUtils.writeStr(dataOutput, getExternalId());
155 WritableUtils.writeStr(dataOutput, getExternalStatus());
156 WritableUtils.writeStr(dataOutput, getTrackerUri());
157 WritableUtils.writeStr(dataOutput, getConsoleUrl());
158 WritableUtils.writeStr(dataOutput, getErrorCode());
159 WritableUtils.writeStr(dataOutput, getErrorMessage());
160 WritableUtils.writeStr(dataOutput, wfId);
161 WritableUtils.writeStr(dataOutput, executionPath);
162 dataOutput.writeInt(pending);
163 dataOutput.writeLong((pendingAge != null) ? pendingAge.getTime() : -1);
164 WritableUtils.writeStr(dataOutput, signalValue);
165 WritableUtils.writeStr(dataOutput, logToken);
166 dataOutput.writeInt(getUserRetryCount());
167 dataOutput.writeInt(getUserRetryInterval());
168 dataOutput.writeInt(getUserRetryMax());
169 }
170
171 /**
172 * Deserialize an action bean from a data input.
173 *
174 * @param dataInput data input.
175 * @throws IOException thrown if the action bean could not be deserialized.
176 */
177 public void readFields(DataInput dataInput) throws IOException {
178 setId(WritableUtils.readStr(dataInput));
179 setName(WritableUtils.readStr(dataInput));
180 setCred(WritableUtils.readStr(dataInput));
181 setType(WritableUtils.readStr(dataInput));
182 setConf(WritableUtils.readStr(dataInput));
183 setStatus(WorkflowAction.Status.valueOf(WritableUtils.readStr(dataInput)));
184 setRetries(dataInput.readInt());
185 long d = dataInput.readLong();
186 if (d != -1) {
187 setStartTime(new Date(d));
188 }
189 d = dataInput.readLong();
190 if (d != -1) {
191 setEndTime(new Date(d));
192 }
193 d = dataInput.readLong();
194 if (d != -1) {
195 setLastCheckTime(new Date(d));
196 }
197 setTransition(WritableUtils.readStr(dataInput));
198 setData(WritableUtils.readStr(dataInput));
199 setStats(WritableUtils.readStr(dataInput));
200 setExternalChildIDs(WritableUtils.readStr(dataInput));
201 setExternalId(WritableUtils.readStr(dataInput));
202 setExternalStatus(WritableUtils.readStr(dataInput));
203 setTrackerUri(WritableUtils.readStr(dataInput));
204 setConsoleUrl(WritableUtils.readStr(dataInput));
205 setErrorInfo(WritableUtils.readStr(dataInput), WritableUtils.readStr(dataInput));
206 wfId = WritableUtils.readStr(dataInput);
207 executionPath = WritableUtils.readStr(dataInput);
208 pending = dataInput.readInt();
209 d = dataInput.readLong();
210 if (d != -1) {
211 pendingAge = new Date(d);
212 pendingAgeTimestamp = DateUtils.convertDateToTimestamp(pendingAge);
213 }
214 signalValue = WritableUtils.readStr(dataInput);
215 logToken = WritableUtils.readStr(dataInput);
216 setUserRetryCount(dataInput.readInt());
217 setUserRetryInterval(dataInput.readInt());
218 setUserRetryMax(dataInput.readInt());
219 }
220
221 /**
222 * Return if the action execution is complete.
223 *
224 * @return if the action start is complete.
225 */
226 public boolean isExecutionComplete() {
227 return getStatus() == WorkflowAction.Status.DONE;
228 }
229
230 /**
231 * Return if the action is START_RETRY or START_MANUAL or END_RETRY or
232 * END_MANUAL.
233 *
234 * @return boolean true if status is START_RETRY or START_MANUAL or END_RETRY or
235 * END_MANUAL
236 */
237 public boolean isRetryOrManual() {
238 return (getStatus() == WorkflowAction.Status.START_RETRY || getStatus() == WorkflowAction.Status.START_MANUAL
239 || getStatus() == WorkflowAction.Status.END_RETRY || getStatus() == WorkflowAction.Status.END_MANUAL);
240 }
241
242 /**
243 * Return true if the action is USER_RETRY
244 *
245 * @return boolean true if status is USER_RETRY
246 */
247 public boolean isUserRetry() {
248 return (getStatus() == WorkflowAction.Status.USER_RETRY);
249 }
250
251 /**
252 * Return if the action is complete.
253 *
254 * @return if the action is complete.
255 */
256 public boolean isComplete() {
257 return getStatus() == WorkflowAction.Status.OK || getStatus() == WorkflowAction.Status.KILLED ||
258 getStatus() == WorkflowAction.Status.ERROR;
259 }
260
261 /**
262 * Set the action pending flag to true.
263 */
264 public void setPendingOnly() {
265 pending = 1;
266 }
267
268 /**
269 * Set the action as pending and the current time as pending.
270 */
271 public void setPending() {
272 pending = 1;
273 pendingAge = new Date();
274 pendingAgeTimestamp = DateUtils.convertDateToTimestamp(pendingAge);
275 }
276
277 /**
278 * Set a time when the action will be pending, normally a time in the future.
279 *
280 * @param pendingAge the time when the action will be pending.
281 */
282 public void setPendingAge(Date pendingAge) {
283 this.pendingAge = pendingAge;
284 this.pendingAgeTimestamp = DateUtils.convertDateToTimestamp(pendingAge);
285 }
286
287 /**
288 * Return the pending age of the action.
289 *
290 * @return the pending age of the action, <code>null</code> if the action is not pending.
291 */
292 public Date getPendingAge() {
293 return DateUtils.toDate(pendingAgeTimestamp);
294 }
295
296 /**
297 * Return if the action is pending.
298 *
299 * @return if the action is pending.
300 */
301 public boolean isPending() {
302 return pending == 1 ? true : false;
303 }
304
305 /**
306 * Removes the pending flag and pendingAge from the action.
307 */
308 public void resetPending() {
309 pending = 0;
310 pendingAge = null;
311 pendingAgeTimestamp = null;
312 }
313
314 /**
315 * Removes the pending flag from the action.
316 */
317 public void resetPendingOnly() {
318 pending = 0;
319 }
320
321 /**
322 * Increments the number of retries for the action.
323 */
324 public void incRetries() {
325 setRetries(getRetries() + 1);
326 }
327
328 /**
329 * Set a tracking information for an action, and set the action status to {@link Action.Status#DONE}
330 *
331 * @param externalId external ID for the action.
332 * @param trackerUri tracker URI for the action.
333 * @param consoleUrl console URL for the action.
334 */
335 public void setStartData(String externalId, String trackerUri, String consoleUrl) {
336 setExternalId(ParamChecker.notEmpty(externalId, "externalId"));
337 setTrackerUri(ParamChecker.notEmpty(trackerUri, "trackerUri"));
338 setConsoleUrl(ParamChecker.notEmpty(consoleUrl, "consoleUrl"));
339 Date now = new Date();
340 if (this.startTimestamp == null) {
341 setStartTime(now);
342 }
343 setLastCheckTime(now);
344 setStatus(Status.RUNNING);
345 }
346
347 /**
348 * Set the completion information for an action start. Sets the Action status to {@link Action.Status#DONE}
349 *
350 * @param externalStatus action external end status.
351 * @param actionData action output data, <code>null</code> if there is no action output data.
352 */
353 public void setExecutionData(String externalStatus, Properties actionData) {
354 setStatus(Status.DONE);
355 setExternalStatus(ParamChecker.notEmpty(externalStatus, "externalStatus"));
356 if (actionData != null) {
357 setData(PropertiesUtils.propertiesToString(actionData));
358 }
359 }
360
361 /**
362 * Return the action statistics info.
363 *
364 * @return Json representation of the stats.
365 */
366 public String getExecutionStats() {
367 return getStats();
368 }
369
370 /**
371 * Set the action statistics info for the workflow action.
372 *
373 * @param Json representation of the stats.
374 */
375 public void setExecutionStats(String jsonStats) {
376 setStats(jsonStats);
377 }
378
379 /**
380 * Return the external child IDs.
381 *
382 * @return externalChildIDs as a string.
383 */
384 public String getExternalChildIDs() {
385 return super.getExternalChildIDs();
386 }
387
388 /**
389 * Set the external child IDs for the workflow action.
390 *
391 * @param externalChildIDs as a string.
392 */
393 public void setExternalChildIDs(String externalChildIDs) {
394 super.setExternalChildIDs(externalChildIDs);
395 }
396
397 /**
398 * Set the completion information for an action end.
399 *
400 * @param status action status, {@link Action.Status#OK} or {@link Action.Status#ERROR} or {@link
401 * Action.Status#KILLED}
402 * @param signalValue the signal value. In most cases, the value should be OK or ERROR.
403 */
404 public void setEndData(Status status, String signalValue) {
405 if (status == null || (status != Status.OK && status != Status.ERROR && status != Status.KILLED)) {
406 throw new IllegalArgumentException("Action status must be OK, ERROR or KILLED. Received ["
407 + status.toString() + "]");
408 }
409 if (status == Status.OK) {
410 setErrorInfo(null, null);
411 }
412 setStatus(status);
413 setSignalValue(ParamChecker.notEmpty(signalValue, "signalValue"));
414 }
415
416
417 /**
418 * Return the job Id.
419 *
420 * @return the job Id.
421 */
422 public String getJobId() {
423 return wfId;
424 }
425
426 /**
427 * Return the job Id.
428 *
429 * @return the job Id.
430 */
431 public String getWfId() {
432 return wfId;
433 }
434
435 /**
436 * Set the job id.
437 *
438 * @param id jobId;
439 */
440 public void setJobId(String id) {
441 this.wfId = id;
442 }
443
444 public String getSlaXml() {
445 return slaXml;
446 }
447
448 public void setSlaXml(String slaXml) {
449 this.slaXml = slaXml;
450 }
451
452 @Override
453 public void setStatus(Status val) {
454 this.status = val.toString();
455 super.setStatus(val);
456 }
457
458 public String getStatusStr() {
459 return status;
460 }
461
462 @Override
463 public Status getStatus() {
464 return Status.valueOf(this.status);
465 }
466
467 /**
468 * Return the node execution path.
469 *
470 * @return the node execution path.
471 */
472 public String getExecutionPath() {
473 return executionPath;
474 }
475
476 /**
477 * Set the node execution path.
478 *
479 * @param executionPath the node execution path.
480 */
481 public void setExecutionPath(String executionPath) {
482 this.executionPath = executionPath;
483 }
484
485 /**
486 * Return the signal value for the action. <p/> For decision nodes it is the choosen transition, for actions it is
487 * OK or ERROR.
488 *
489 * @return the action signal value.
490 */
491 public String getSignalValue() {
492 return signalValue;
493 }
494
495 /**
496 * Set the signal value for the action. <p/> For decision nodes it is the choosen transition, for actions it is OK
497 * or ERROR.
498 *
499 * @param signalValue the action signal value.
500 */
501 public void setSignalValue(String signalValue) {
502 this.signalValue = signalValue;
503 }
504
505 /**
506 * Return the job log token.
507 *
508 * @return the job log token.
509 */
510 public String getLogToken() {
511 return logToken;
512 }
513
514 /**
515 * Set the job log token.
516 *
517 * @param logToken the job log token.
518 */
519 public void setLogToken(String logToken) {
520 this.logToken = logToken;
521 }
522
523 /**
524 * Return the action last check time
525 *
526 * @return the last check time
527 */
528 public Date getLastCheckTime() {
529 return DateUtils.toDate(lastCheckTimestamp);
530 }
531
532 /**
533 * Return the action last check time
534 *
535 * @return the last check time
536 */
537 public Timestamp getLastCheckTimestamp() {
538 return lastCheckTimestamp;
539 }
540
541 /**
542 * Return the action last check time
543 *
544 * @return the last check time
545 */
546 public Timestamp getStartTimestamp() {
547 return startTimestamp;
548 }
549
550 /**
551 * Return the action last check time
552 *
553 * @return the last check time
554 */
555 public Timestamp getEndTimestamp() {
556 return endTimestamp;
557 }
558
559
560 /**
561 * Return the action last check time
562 *
563 * @return the last check time
564 */
565 public Timestamp getPendingAgeTimestamp() {
566 return pendingAgeTimestamp;
567 }
568
569 /**
570 * Sets the action last check time
571 *
572 * @param lastCheckTime the last check time to set.
573 */
574 public void setLastCheckTime(Date lastCheckTime) {
575 this.lastCheckTimestamp = DateUtils.convertDateToTimestamp(lastCheckTime);
576 }
577
578 public boolean getPending() {
579 return this.pending == 1 ? true : false;
580 }
581
582 @Override
583 public Date getStartTime() {
584 return DateUtils.toDate(startTimestamp);
585 }
586
587 @Override
588 public void setStartTime(Date startTime) {
589 super.setStartTime(startTime);
590 this.startTimestamp = DateUtils.convertDateToTimestamp(startTime);
591 }
592
593 @Override
594 public Date getEndTime() {
595 return DateUtils.toDate(endTimestamp);
596 }
597
598 @Override
599 public void setEndTime(Date endTime) {
600 super.setEndTime(endTime);
601 this.endTimestamp = DateUtils.convertDateToTimestamp(endTime);
602 }
603
604 }