This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie;
019
020 import java.io.DataInput;
021 import java.io.DataOutput;
022 import java.io.IOException;
023 import java.sql.Timestamp;
024 import java.util.Date;
025 import java.util.Properties;
026
027 import javax.persistence.Basic;
028 import javax.persistence.Column;
029 import javax.persistence.Entity;
030 import javax.persistence.Lob;
031 import javax.persistence.NamedQueries;
032 import javax.persistence.NamedQuery;
033 import javax.persistence.Transient;
034
035 import org.apache.hadoop.io.Writable;
036 import org.apache.oozie.client.WorkflowAction;
037 import org.apache.oozie.client.rest.JsonWorkflowAction;
038 import org.apache.oozie.util.DateUtils;
039 import org.apache.oozie.util.ParamChecker;
040 import org.apache.oozie.util.PropertiesUtils;
041 import org.apache.oozie.util.WritableUtils;
042 import org.apache.openjpa.persistence.jdbc.Index;
043
044 /**
045 * Bean that contains all the information to start an action for a workflow node.
046 */
047 @Entity
048 @NamedQueries({
049
050 @NamedQuery(name = "UPDATE_ACTION", query = "update WorkflowActionBean a set a.conf = :conf, a.consoleUrl = :consoleUrl, a.data = :data, a.stats = :stats, a.externalChildIDs = :externalChildIDs, a.errorCode = :errorCode, a.errorMessage = :errorMessage, a.externalId = :externalId, a.externalStatus = :externalStatus, a.name = :name, a.cred = :cred , a.retries = :retries, a.trackerUri = :trackerUri, a.transition = :transition, a.type = :type, a.endTimestamp = :endTime, a.executionPath = :executionPath, a.lastCheckTimestamp = :lastCheckTime, a.logToken = :logToken, a.pending = :pending, a.pendingAgeTimestamp = :pendingAge, a.signalValue = :signalValue, a.slaXml = :slaXml, a.startTimestamp = :startTime, a.status = :status, a.wfId=:wfId where a.id = :id"),
051
052 @NamedQuery(name = "DELETE_ACTION", query = "delete from WorkflowActionBean a where a.id = :id"),
053
054 @NamedQuery(name = "DELETE_ACTIONS_FOR_WORKFLOW", query = "delete from WorkflowActionBean a where a.wfId = :wfId"),
055
056 @NamedQuery(name = "GET_ACTIONS", query = "select OBJECT(a) from WorkflowActionBean a"),
057
058 @NamedQuery(name = "GET_ACTION", query = "select OBJECT(a) from WorkflowActionBean a where a.id = :id"),
059
060 @NamedQuery(name = "GET_ACTION_FOR_UPDATE", query = "select OBJECT(a) from WorkflowActionBean a where a.id = :id"),
061
062 @NamedQuery(name = "GET_ACTION_FOR_SLA", query = "select a.id, a.status, a.startTimestamp, a.endTimestamp from WorkflowActionBean a where a.id = :id"),
063
064 @NamedQuery(name = "GET_ACTIONS_FOR_WORKFLOW", query = "select OBJECT(a) from WorkflowActionBean a where a.wfId = :wfId order by a.startTimestamp"),
065
066 @NamedQuery(name = "GET_ACTIONS_OF_WORKFLOW_FOR_UPDATE", query = "select OBJECT(a) from WorkflowActionBean a where a.wfId = :wfId order by a.startTimestamp"),
067
068 @NamedQuery(name = "GET_PENDING_ACTIONS", query = "select OBJECT(a) from WorkflowActionBean a where a.pending = 1 AND a.pendingAgeTimestamp < :pendingAge AND a.status <> 'RUNNING'"),
069
070 @NamedQuery(name = "GET_RUNNING_ACTIONS", query = "select OBJECT(a) from WorkflowActionBean a where a.pending = 1 AND a.status = 'RUNNING' AND a.lastCheckTimestamp < :lastCheckTime"),
071
072 @NamedQuery(name = "GET_RETRY_MANUAL_ACTIONS", query = "select OBJECT(a) from WorkflowActionBean a where a.wfId = :wfId AND (a.status = 'START_RETRY' OR a.status = 'START_MANUAL' OR a.status = 'END_RETRY' OR a.status = 'END_MANUAL')") })
073
074 public class WorkflowActionBean extends JsonWorkflowAction implements Writable {
075
076 @Basic
077 @Index
078 @Column(name = "wf_id")
079 private String wfId = null;
080
081 @Basic
082 @Index
083 @Column(name = "status")
084 private String status = WorkflowAction.Status.PREP.toString();
085
086 @Basic
087 @Column(name = "last_check_time")
088 private java.sql.Timestamp lastCheckTimestamp;
089
090 @Basic
091 @Column(name = "end_time")
092 private java.sql.Timestamp endTimestamp = null;
093
094 @Basic
095 @Column(name = "start_time")
096 private java.sql.Timestamp startTimestamp = null;
097
098 @Basic
099 @Column(name = "execution_path", length = 1024)
100 private String executionPath = null;
101
102 @Basic
103 @Column(name = "pending")
104 private int pending = 0;
105
106 // @Temporal(TemporalType.TIME)
107 // @Column(name="pending_age",columnDefinition="timestamp default '0000-00-00 00:00:00'")
108 @Basic
109 @Index
110 @Column(name = "pending_age")
111 private java.sql.Timestamp pendingAgeTimestamp = null;
112
113 @Basic
114 @Column(name = "signal_value")
115 private String signalValue = null;
116
117 @Basic
118 @Column(name = "log_token")
119 private String logToken = null;
120
121 @Transient
122 private Date pendingAge;
123
124 @Column(name = "sla_xml")
125 @Lob
126 private String slaXml = null;
127
128 /**
129 * Default constructor.
130 */
131 public WorkflowActionBean() {
132 }
133
134 /**
135 * Serialize the action bean to a data output.
136 *
137 * @param dataOutput data output.
138 * @throws IOException thrown if the action bean could not be serialized.
139 */
140
141 public void write(DataOutput dataOutput) throws IOException {
142 WritableUtils.writeStr(dataOutput, getId());
143 WritableUtils.writeStr(dataOutput, getName());
144 WritableUtils.writeStr(dataOutput, getCred());
145 WritableUtils.writeStr(dataOutput, getType());
146 WritableUtils.writeStr(dataOutput, getConf());
147 WritableUtils.writeStr(dataOutput, getStatusStr());
148 dataOutput.writeInt(getRetries());
149 dataOutput.writeLong((getStartTime() != null) ? getStartTime().getTime() : -1);
150 dataOutput.writeLong((getEndTime() != null) ? getEndTime().getTime() : -1);
151 dataOutput.writeLong((getLastCheckTime() != null) ? getLastCheckTime().getTime() : -1);
152 WritableUtils.writeStr(dataOutput, getTransition());
153 WritableUtils.writeStr(dataOutput, getData());
154 WritableUtils.writeStr(dataOutput, getStats());
155 WritableUtils.writeStr(dataOutput, getExternalChildIDs());
156 WritableUtils.writeStr(dataOutput, getExternalId());
157 WritableUtils.writeStr(dataOutput, getExternalStatus());
158 WritableUtils.writeStr(dataOutput, getTrackerUri());
159 WritableUtils.writeStr(dataOutput, getConsoleUrl());
160 WritableUtils.writeStr(dataOutput, getErrorCode());
161 WritableUtils.writeStr(dataOutput, getErrorMessage());
162 WritableUtils.writeStr(dataOutput, wfId);
163 WritableUtils.writeStr(dataOutput, executionPath);
164 dataOutput.writeInt(pending);
165 dataOutput.writeLong((pendingAge != null) ? pendingAge.getTime() : -1);
166 WritableUtils.writeStr(dataOutput, signalValue);
167 WritableUtils.writeStr(dataOutput, logToken);
168 dataOutput.writeInt(getUserRetryCount());
169 dataOutput.writeInt(getUserRetryInterval());
170 dataOutput.writeInt(getUserRetryMax());
171 }
172
173 /**
174 * Deserialize an action bean from a data input.
175 *
176 * @param dataInput data input.
177 * @throws IOException thrown if the action bean could not be deserialized.
178 */
179 public void readFields(DataInput dataInput) throws IOException {
180 setId(WritableUtils.readStr(dataInput));
181 setName(WritableUtils.readStr(dataInput));
182 setCred(WritableUtils.readStr(dataInput));
183 setType(WritableUtils.readStr(dataInput));
184 setConf(WritableUtils.readStr(dataInput));
185 setStatus(WorkflowAction.Status.valueOf(WritableUtils.readStr(dataInput)));
186 setRetries(dataInput.readInt());
187 long d = dataInput.readLong();
188 if (d != -1) {
189 setStartTime(new Date(d));
190 }
191 d = dataInput.readLong();
192 if (d != -1) {
193 setEndTime(new Date(d));
194 }
195 d = dataInput.readLong();
196 if (d != -1) {
197 setLastCheckTime(new Date(d));
198 }
199 setTransition(WritableUtils.readStr(dataInput));
200 setData(WritableUtils.readStr(dataInput));
201 setStats(WritableUtils.readStr(dataInput));
202 setExternalChildIDs(WritableUtils.readStr(dataInput));
203 setExternalId(WritableUtils.readStr(dataInput));
204 setExternalStatus(WritableUtils.readStr(dataInput));
205 setTrackerUri(WritableUtils.readStr(dataInput));
206 setConsoleUrl(WritableUtils.readStr(dataInput));
207 setErrorInfo(WritableUtils.readStr(dataInput), WritableUtils.readStr(dataInput));
208 wfId = WritableUtils.readStr(dataInput);
209 executionPath = WritableUtils.readStr(dataInput);
210 pending = dataInput.readInt();
211 d = dataInput.readLong();
212 if (d != -1) {
213 pendingAge = new Date(d);
214 pendingAgeTimestamp = DateUtils.convertDateToTimestamp(pendingAge);
215 }
216 signalValue = WritableUtils.readStr(dataInput);
217 logToken = WritableUtils.readStr(dataInput);
218 setUserRetryCount(dataInput.readInt());
219 setUserRetryInterval(dataInput.readInt());
220 setUserRetryMax(dataInput.readInt());
221 }
222
223 /**
224 * Return whether workflow action in terminal state or not
225 *
226 * @return
227 */
228 public boolean inTerminalState() {
229 boolean isTerminalState = false;
230 switch (WorkflowAction.Status.valueOf(status)) {
231 case ERROR:
232 case FAILED:
233 case KILLED:
234 case OK:
235 isTerminalState = true;
236 break;
237 default:
238 break;
239 }
240 return isTerminalState;
241 }
242
243 /**
244 * Return if the action execution is complete.
245 *
246 * @return if the action start is complete.
247 */
248 public boolean isExecutionComplete() {
249 return getStatus() == WorkflowAction.Status.DONE;
250 }
251
252 /**
253 * Return if the action is START_RETRY or START_MANUAL or END_RETRY or
254 * END_MANUAL.
255 *
256 * @return boolean true if status is START_RETRY or START_MANUAL or END_RETRY or
257 * END_MANUAL
258 */
259 public boolean isRetryOrManual() {
260 return (getStatus() == WorkflowAction.Status.START_RETRY || getStatus() == WorkflowAction.Status.START_MANUAL
261 || getStatus() == WorkflowAction.Status.END_RETRY || getStatus() == WorkflowAction.Status.END_MANUAL);
262 }
263
264 /**
265 * Return true if the action is USER_RETRY
266 *
267 * @return boolean true if status is USER_RETRY
268 */
269 public boolean isUserRetry() {
270 return (getStatus() == WorkflowAction.Status.USER_RETRY);
271 }
272
273 /**
274 * Return if the action is complete.
275 *
276 * @return if the action is complete.
277 */
278 public boolean isComplete() {
279 return getStatus() == WorkflowAction.Status.OK || getStatus() == WorkflowAction.Status.KILLED ||
280 getStatus() == WorkflowAction.Status.ERROR;
281 }
282
283 /**
284 * Return if the action is complete with failure.
285 *
286 * @return if the action is complete with failure.
287 */
288 public boolean isTerminalWithFailure() {
289 boolean result = false;
290 switch (getStatus()) {
291 case FAILED:
292 case KILLED:
293 case ERROR:
294 result = true;
295 }
296 return result;
297 }
298
299 /**
300 * Set the action pending flag to true.
301 */
302 public void setPendingOnly() {
303 pending = 1;
304 }
305
306 /**
307 * Set the action as pending and the current time as pending.
308 */
309 public void setPending() {
310 pending = 1;
311 pendingAge = new Date();
312 pendingAgeTimestamp = DateUtils.convertDateToTimestamp(pendingAge);
313 }
314
315 /**
316 * Set a time when the action will be pending, normally a time in the future.
317 *
318 * @param pendingAge the time when the action will be pending.
319 */
320 public void setPendingAge(Date pendingAge) {
321 this.pendingAge = pendingAge;
322 this.pendingAgeTimestamp = DateUtils.convertDateToTimestamp(pendingAge);
323 }
324
325 /**
326 * Return the pending age of the action.
327 *
328 * @return the pending age of the action, <code>null</code> if the action is not pending.
329 */
330 public Date getPendingAge() {
331 return DateUtils.toDate(pendingAgeTimestamp);
332 }
333
334 /**
335 * Return if the action is pending.
336 *
337 * @return if the action is pending.
338 */
339 public boolean isPending() {
340 return pending == 1 ? true : false;
341 }
342
343 /**
344 * Removes the pending flag and pendingAge from the action.
345 */
346 public void resetPending() {
347 pending = 0;
348 pendingAge = null;
349 pendingAgeTimestamp = null;
350 }
351
352 /**
353 * Removes the pending flag from the action.
354 */
355 public void resetPendingOnly() {
356 pending = 0;
357 }
358
359 /**
360 * Increments the number of retries for the action.
361 */
362 public void incRetries() {
363 setRetries(getRetries() + 1);
364 }
365
366 /**
367 * Set a tracking information for an action, and set the action status to {@link Action.Status#DONE}
368 *
369 * @param externalId external ID for the action.
370 * @param trackerUri tracker URI for the action.
371 * @param consoleUrl console URL for the action.
372 */
373 public void setStartData(String externalId, String trackerUri, String consoleUrl) {
374 setExternalId(ParamChecker.notEmpty(externalId, "externalId"));
375 setTrackerUri(ParamChecker.notEmpty(trackerUri, "trackerUri"));
376 setConsoleUrl(ParamChecker.notEmpty(consoleUrl, "consoleUrl"));
377 Date now = new Date();
378 if (this.startTimestamp == null) {
379 setStartTime(now);
380 }
381 setLastCheckTime(now);
382 setStatus(Status.RUNNING);
383 }
384
385 /**
386 * Set the completion information for an action start. Sets the Action status to {@link Action.Status#DONE}
387 *
388 * @param externalStatus action external end status.
389 * @param actionData action output data, <code>null</code> if there is no action output data.
390 */
391 public void setExecutionData(String externalStatus, Properties actionData) {
392 setStatus(Status.DONE);
393 setExternalStatus(ParamChecker.notEmpty(externalStatus, "externalStatus"));
394 if (actionData != null) {
395 setData(PropertiesUtils.propertiesToString(actionData));
396 }
397 }
398
399 /**
400 * Return the action statistics info.
401 *
402 * @return Json representation of the stats.
403 */
404 public String getExecutionStats() {
405 return getStats();
406 }
407
408 /**
409 * Set the action statistics info for the workflow action.
410 *
411 * @param Json representation of the stats.
412 */
413 public void setExecutionStats(String jsonStats) {
414 setStats(jsonStats);
415 }
416
417 /**
418 * Return the external child IDs.
419 *
420 * @return externalChildIDs as a string.
421 */
422 public String getExternalChildIDs() {
423 return super.getExternalChildIDs();
424 }
425
426 /**
427 * Set the external child IDs for the workflow action.
428 *
429 * @param externalChildIDs as a string.
430 */
431 public void setExternalChildIDs(String externalChildIDs) {
432 super.setExternalChildIDs(externalChildIDs);
433 }
434
435 /**
436 * Set the completion information for an action end.
437 *
438 * @param status action status, {@link Action.Status#OK} or {@link Action.Status#ERROR} or {@link
439 * Action.Status#KILLED}
440 * @param signalValue the signal value. In most cases, the value should be OK or ERROR.
441 */
442 public void setEndData(Status status, String signalValue) {
443 if (status == null || (status != Status.OK && status != Status.ERROR && status != Status.KILLED)) {
444 throw new IllegalArgumentException("Action status must be OK, ERROR or KILLED. Received ["
445 + status.toString() + "]");
446 }
447 if (status == Status.OK) {
448 setErrorInfo(null, null);
449 }
450 setStatus(status);
451 setSignalValue(ParamChecker.notEmpty(signalValue, "signalValue"));
452 }
453
454
455 /**
456 * Return the job Id.
457 *
458 * @return the job Id.
459 */
460 public String getJobId() {
461 return wfId;
462 }
463
464 /**
465 * Return the job Id.
466 *
467 * @return the job Id.
468 */
469 public String getWfId() {
470 return wfId;
471 }
472
473 /**
474 * Set the job id.
475 *
476 * @param id jobId;
477 */
478 public void setJobId(String id) {
479 this.wfId = id;
480 }
481
482 public String getSlaXml() {
483 return slaXml;
484 }
485
486 public void setSlaXml(String slaXml) {
487 this.slaXml = slaXml;
488 }
489
490 @Override
491 public void setStatus(Status val) {
492 this.status = val.toString();
493 super.setStatus(val);
494 }
495
496 public String getStatusStr() {
497 return status;
498 }
499
500 @Override
501 public Status getStatus() {
502 return Status.valueOf(this.status);
503 }
504
505 /**
506 * Return the node execution path.
507 *
508 * @return the node execution path.
509 */
510 public String getExecutionPath() {
511 return executionPath;
512 }
513
514 /**
515 * Set the node execution path.
516 *
517 * @param executionPath the node execution path.
518 */
519 public void setExecutionPath(String executionPath) {
520 this.executionPath = executionPath;
521 }
522
523 /**
524 * Return the signal value for the action. <p/> For decision nodes it is the choosen transition, for actions it is
525 * OK or ERROR.
526 *
527 * @return the action signal value.
528 */
529 public String getSignalValue() {
530 return signalValue;
531 }
532
533 /**
534 * Set the signal value for the action. <p/> For decision nodes it is the choosen transition, for actions it is OK
535 * or ERROR.
536 *
537 * @param signalValue the action signal value.
538 */
539 public void setSignalValue(String signalValue) {
540 this.signalValue = signalValue;
541 }
542
543 /**
544 * Return the job log token.
545 *
546 * @return the job log token.
547 */
548 public String getLogToken() {
549 return logToken;
550 }
551
552 /**
553 * Set the job log token.
554 *
555 * @param logToken the job log token.
556 */
557 public void setLogToken(String logToken) {
558 this.logToken = logToken;
559 }
560
561 /**
562 * Return the action last check time
563 *
564 * @return the last check time
565 */
566 public Date getLastCheckTime() {
567 return DateUtils.toDate(lastCheckTimestamp);
568 }
569
570 /**
571 * Return the action last check time
572 *
573 * @return the last check time
574 */
575 public Timestamp getLastCheckTimestamp() {
576 return lastCheckTimestamp;
577 }
578
579 /**
580 * Return the action last check time
581 *
582 * @return the last check time
583 */
584 public Timestamp getStartTimestamp() {
585 return startTimestamp;
586 }
587
588 /**
589 * Return the action last check time
590 *
591 * @return the last check time
592 */
593 public Timestamp getEndTimestamp() {
594 return endTimestamp;
595 }
596
597
598 /**
599 * Return the action last check time
600 *
601 * @return the last check time
602 */
603 public Timestamp getPendingAgeTimestamp() {
604 return pendingAgeTimestamp;
605 }
606
607 /**
608 * Sets the action last check time
609 *
610 * @param lastCheckTime the last check time to set.
611 */
612 public void setLastCheckTime(Date lastCheckTime) {
613 this.lastCheckTimestamp = DateUtils.convertDateToTimestamp(lastCheckTime);
614 }
615
616 public boolean getPending() {
617 return this.pending == 1 ? true : false;
618 }
619
620 @Override
621 public Date getStartTime() {
622 return DateUtils.toDate(startTimestamp);
623 }
624
625 @Override
626 public void setStartTime(Date startTime) {
627 super.setStartTime(startTime);
628 this.startTimestamp = DateUtils.convertDateToTimestamp(startTime);
629 }
630
631 @Override
632 public Date getEndTime() {
633 return DateUtils.toDate(endTimestamp);
634 }
635
636 @Override
637 public void setEndTime(Date endTime) {
638 super.setEndTime(endTime);
639 this.endTimestamp = DateUtils.convertDateToTimestamp(endTime);
640 }
641
642 }