This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie;
019
020 import java.io.DataInput;
021 import java.io.DataOutput;
022 import java.io.IOException;
023 import java.sql.Timestamp;
024 import java.util.Date;
025 import java.util.Properties;
026
027 import javax.persistence.Basic;
028 import javax.persistence.Column;
029 import javax.persistence.Entity;
030 import javax.persistence.Lob;
031 import javax.persistence.NamedQueries;
032 import javax.persistence.NamedQuery;
033 import javax.persistence.Transient;
034
035 import org.apache.hadoop.io.Writable;
036 import org.apache.oozie.client.WorkflowAction;
037 import org.apache.oozie.client.rest.JsonWorkflowAction;
038 import org.apache.oozie.util.DateUtils;
039 import org.apache.oozie.util.ParamChecker;
040 import org.apache.oozie.util.PropertiesUtils;
041 import org.apache.oozie.util.WritableUtils;
042 import org.apache.openjpa.persistence.jdbc.Index;
043
044 /**
045 * Bean that contains all the information to start an action for a workflow node.
046 */
047 @Entity
048 @NamedQueries({
049
050 @NamedQuery(name = "UPDATE_ACTION", query = "update WorkflowActionBean a set a.conf = :conf, a.consoleUrl = :consoleUrl, a.data = :data, a.errorCode = :errorCode, a.errorMessage = :errorMessage, a.externalId = :externalId, a.externalStatus = :externalStatus, a.name = :name, a.cred = :cred , a.retries = :retries, a.trackerUri = :trackerUri, a.transition = :transition, a.type = :type, a.endTimestamp = :endTime, a.executionPath = :executionPath, a.lastCheckTimestamp = :lastCheckTime, a.logToken = :logToken, a.pending = :pending, a.pendingAgeTimestamp = :pendingAge, a.signalValue = :signalValue, a.slaXml = :slaXml, a.startTimestamp = :startTime, a.status = :status, a.wfId=:wfId where a.id = :id"),
051
052 @NamedQuery(name = "DELETE_ACTION", query = "delete from WorkflowActionBean a where a.id = :id"),
053
054 @NamedQuery(name = "DELETE_ACTIONS_FOR_WORKFLOW", query = "delete from WorkflowActionBean a where a.wfId = :wfId"),
055
056 @NamedQuery(name = "GET_ACTIONS", query = "select OBJECT(a) from WorkflowActionBean a"),
057
058 @NamedQuery(name = "GET_ACTION", query = "select OBJECT(a) from WorkflowActionBean a where a.id = :id"),
059
060 @NamedQuery(name = "GET_ACTION_FOR_UPDATE", query = "select OBJECT(a) from WorkflowActionBean a where a.id = :id"),
061
062 @NamedQuery(name = "GET_ACTIONS_FOR_WORKFLOW", query = "select OBJECT(a) from WorkflowActionBean a where a.wfId = :wfId order by a.startTimestamp"),
063
064 @NamedQuery(name = "GET_ACTIONS_OF_WORKFLOW_FOR_UPDATE", query = "select OBJECT(a) from WorkflowActionBean a where a.wfId = :wfId order by a.startTimestamp"),
065
066 @NamedQuery(name = "GET_PENDING_ACTIONS", query = "select OBJECT(a) from WorkflowActionBean a where a.pending = 1 AND a.pendingAgeTimestamp < :pendingAge AND a.status <> 'RUNNING'"),
067
068 @NamedQuery(name = "GET_RUNNING_ACTIONS", query = "select OBJECT(a) from WorkflowActionBean a where a.pending = 1 AND a.status = 'RUNNING' AND a.lastCheckTimestamp < :lastCheckTime"),
069
070 @NamedQuery(name = "GET_RETRY_MANUAL_ACTIONS", query = "select OBJECT(a) from WorkflowActionBean a where a.wfId = :wfId AND (a.status = 'START_RETRY' OR a.status = 'START_MANUAL' OR a.status = 'END_RETRY' OR a.status = 'END_MANUAL')") })
071
072 public class WorkflowActionBean extends JsonWorkflowAction implements Writable {
073
074 @Basic
075 @Index
076 @Column(name = "wf_id")
077 private String wfId = null;
078
079 @Basic
080 @Index
081 @Column(name = "status")
082 private String status = WorkflowAction.Status.PREP.toString();
083
084 @Basic
085 @Column(name = "last_check_time")
086 private java.sql.Timestamp lastCheckTimestamp;
087
088 @Basic
089 @Column(name = "end_time")
090 private java.sql.Timestamp endTimestamp = null;
091
092 @Basic
093 @Column(name = "start_time")
094 private java.sql.Timestamp startTimestamp = null;
095
096 @Basic
097 @Column(name = "execution_path", length = 1024)
098 private String executionPath = null;
099
100 @Basic
101 @Column(name = "pending")
102 private int pending = 0;
103
104 // @Temporal(TemporalType.TIME)
105 // @Column(name="pending_age",columnDefinition="timestamp default '0000-00-00 00:00:00'")
106 @Basic
107 @Index
108 @Column(name = "pending_age")
109 private java.sql.Timestamp pendingAgeTimestamp = null;
110
111 @Basic
112 @Column(name = "signal_value")
113 private String signalValue = null;
114
115 @Basic
116 @Column(name = "log_token")
117 private String logToken = null;
118
119 @Transient
120 private Date pendingAge;
121
122 @Column(name = "sla_xml")
123 @Lob
124 private String slaXml = null;
125
126 /**
127 * Default constructor.
128 */
129 public WorkflowActionBean() {
130 }
131
132 /**
133 * Serialize the action bean to a data output.
134 *
135 * @param dataOutput data output.
136 * @throws IOException thrown if the action bean could not be serialized.
137 */
138
139 public void write(DataOutput dataOutput) throws IOException {
140 WritableUtils.writeStr(dataOutput, getId());
141 WritableUtils.writeStr(dataOutput, getName());
142 WritableUtils.writeStr(dataOutput, getCred());
143 WritableUtils.writeStr(dataOutput, getType());
144 WritableUtils.writeStr(dataOutput, getConf());
145 WritableUtils.writeStr(dataOutput, getStatusStr());
146 dataOutput.writeInt(getRetries());
147 dataOutput.writeLong((getStartTime() != null) ? getStartTime().getTime() : -1);
148 dataOutput.writeLong((getEndTime() != null) ? getEndTime().getTime() : -1);
149 dataOutput.writeLong((getLastCheckTime() != null) ? getLastCheckTime().getTime() : -1);
150 WritableUtils.writeStr(dataOutput, getTransition());
151 WritableUtils.writeStr(dataOutput, getData());
152 WritableUtils.writeStr(dataOutput, getExternalId());
153 WritableUtils.writeStr(dataOutput, getExternalStatus());
154 WritableUtils.writeStr(dataOutput, getTrackerUri());
155 WritableUtils.writeStr(dataOutput, getConsoleUrl());
156 WritableUtils.writeStr(dataOutput, getErrorCode());
157 WritableUtils.writeStr(dataOutput, getErrorMessage());
158 WritableUtils.writeStr(dataOutput, wfId);
159 WritableUtils.writeStr(dataOutput, executionPath);
160 dataOutput.writeInt(pending);
161 dataOutput.writeLong((pendingAge != null) ? pendingAge.getTime() : -1);
162 WritableUtils.writeStr(dataOutput, signalValue);
163 WritableUtils.writeStr(dataOutput, logToken);
164 dataOutput.writeInt(getUserRetryCount());
165 dataOutput.writeInt(getUserRetryInterval());
166 dataOutput.writeInt(getUserRetryMax());
167 }
168
169 /**
170 * Deserialize an action bean from a data input.
171 *
172 * @param dataInput data input.
173 * @throws IOException thrown if the action bean could not be deserialized.
174 */
175 public void readFields(DataInput dataInput) throws IOException {
176 setId(WritableUtils.readStr(dataInput));
177 setName(WritableUtils.readStr(dataInput));
178 setCred(WritableUtils.readStr(dataInput));
179 setType(WritableUtils.readStr(dataInput));
180 setConf(WritableUtils.readStr(dataInput));
181 setStatus(WorkflowAction.Status.valueOf(WritableUtils.readStr(dataInput)));
182 setRetries(dataInput.readInt());
183 long d = dataInput.readLong();
184 if (d != -1) {
185 setStartTime(new Date(d));
186 }
187 d = dataInput.readLong();
188 if (d != -1) {
189 setEndTime(new Date(d));
190 }
191 d = dataInput.readLong();
192 if (d != -1) {
193 setLastCheckTime(new Date(d));
194 }
195 setTransition(WritableUtils.readStr(dataInput));
196 setData(WritableUtils.readStr(dataInput));
197 setExternalId(WritableUtils.readStr(dataInput));
198 setExternalStatus(WritableUtils.readStr(dataInput));
199 setTrackerUri(WritableUtils.readStr(dataInput));
200 setConsoleUrl(WritableUtils.readStr(dataInput));
201 setErrorInfo(WritableUtils.readStr(dataInput), WritableUtils.readStr(dataInput));
202 wfId = WritableUtils.readStr(dataInput);
203 executionPath = WritableUtils.readStr(dataInput);
204 pending = dataInput.readInt();
205 d = dataInput.readLong();
206 if (d != -1) {
207 pendingAge = new Date(d);
208 pendingAgeTimestamp = DateUtils.convertDateToTimestamp(pendingAge);
209 }
210 signalValue = WritableUtils.readStr(dataInput);
211 logToken = WritableUtils.readStr(dataInput);
212 setUserRetryCount(dataInput.readInt());
213 setUserRetryInterval(dataInput.readInt());
214 setUserRetryMax(dataInput.readInt());
215 }
216
217 /**
218 * Return if the action execution is complete.
219 *
220 * @return if the action start is complete.
221 */
222 public boolean isExecutionComplete() {
223 return getStatus() == WorkflowAction.Status.DONE;
224 }
225
226 /**
227 * Return if the action is START_RETRY or START_MANUAL or END_RETRY or
228 * END_MANUAL.
229 *
230 * @return boolean true if status is START_RETRY or START_MANUAL or END_RETRY or
231 * END_MANUAL
232 */
233 public boolean isRetryOrManual() {
234 return (getStatus() == WorkflowAction.Status.START_RETRY || getStatus() == WorkflowAction.Status.START_MANUAL
235 || getStatus() == WorkflowAction.Status.END_RETRY || getStatus() == WorkflowAction.Status.END_MANUAL);
236 }
237
238 /**
239 * Return true if the action is USER_RETRY
240 *
241 * @return boolean true if status is USER_RETRY
242 */
243 public boolean isUserRetry() {
244 return (getStatus() == WorkflowAction.Status.USER_RETRY);
245 }
246
247 /**
248 * Return if the action is complete.
249 *
250 * @return if the action is complete.
251 */
252 public boolean isComplete() {
253 return getStatus() == WorkflowAction.Status.OK || getStatus() == WorkflowAction.Status.KILLED ||
254 getStatus() == WorkflowAction.Status.ERROR;
255 }
256
257 /**
258 * Set the action pending flag to true.
259 */
260 public void setPendingOnly() {
261 pending = 1;
262 }
263
264 /**
265 * Set the action as pending and the current time as pending.
266 */
267 public void setPending() {
268 pending = 1;
269 pendingAge = new Date();
270 pendingAgeTimestamp = DateUtils.convertDateToTimestamp(pendingAge);
271 }
272
273 /**
274 * Set a time when the action will be pending, normally a time in the future.
275 *
276 * @param pendingAge the time when the action will be pending.
277 */
278 public void setPendingAge(Date pendingAge) {
279 this.pendingAge = pendingAge;
280 this.pendingAgeTimestamp = DateUtils.convertDateToTimestamp(pendingAge);
281 }
282
283 /**
284 * Return the pending age of the action.
285 *
286 * @return the pending age of the action, <code>null</code> if the action is not pending.
287 */
288 public Date getPendingAge() {
289 return DateUtils.toDate(pendingAgeTimestamp);
290 }
291
292 /**
293 * Return if the action is pending.
294 *
295 * @return if the action is pending.
296 */
297 public boolean isPending() {
298 return pending == 1 ? true : false;
299 }
300
301 /**
302 * Removes the pending flag and pendingAge from the action.
303 */
304 public void resetPending() {
305 pending = 0;
306 pendingAge = null;
307 pendingAgeTimestamp = null;
308 }
309
310 /**
311 * Removes the pending flag from the action.
312 */
313 public void resetPendingOnly() {
314 pending = 0;
315 }
316
317 /**
318 * Increments the number of retries for the action.
319 */
320 public void incRetries() {
321 setRetries(getRetries() + 1);
322 }
323
324 /**
325 * Set a tracking information for an action, and set the action status to {@link Action.Status#DONE}
326 *
327 * @param externalId external ID for the action.
328 * @param trackerUri tracker URI for the action.
329 * @param consoleUrl console URL for the action.
330 */
331 public void setStartData(String externalId, String trackerUri, String consoleUrl) {
332 setExternalId(ParamChecker.notEmpty(externalId, "externalId"));
333 setTrackerUri(ParamChecker.notEmpty(trackerUri, "trackerUri"));
334 setConsoleUrl(ParamChecker.notEmpty(consoleUrl, "consoleUrl"));
335 Date now = new Date();
336 setStartTime(now);
337 setLastCheckTime(now);
338 setStatus(Status.RUNNING);
339 }
340
341 /**
342 * Set the completion information for an action start. Sets the Action status to {@link Action.Status#DONE}
343 *
344 * @param externalStatus action external end status.
345 * @param actionData action output data, <code>null</code> if there is no action output data.
346 */
347 public void setExecutionData(String externalStatus, Properties actionData) {
348 setStatus(Status.DONE);
349 setExternalStatus(ParamChecker.notEmpty(externalStatus, "externalStatus"));
350 if (actionData != null) {
351 setData(PropertiesUtils.propertiesToString(actionData));
352 }
353 }
354
355 /**
356 * Set the completion information for an action end.
357 *
358 * @param status action status, {@link Action.Status#OK} or {@link Action.Status#ERROR} or {@link
359 * Action.Status#KILLED}
360 * @param signalValue the signal value. In most cases, the value should be OK or ERROR.
361 */
362 public void setEndData(Status status, String signalValue) {
363 if (status == null || (status != Status.OK && status != Status.ERROR && status != Status.KILLED)) {
364 throw new IllegalArgumentException("Action status must be OK, ERROR or KILLED. Received ["
365 + status.toString() + "]");
366 }
367 if (status == Status.OK) {
368 setErrorInfo(null, null);
369 }
370 setStatus(status);
371 setSignalValue(ParamChecker.notEmpty(signalValue, "signalValue"));
372 }
373
374
375 /**
376 * Return the job Id.
377 *
378 * @return the job Id.
379 */
380 public String getJobId() {
381 return wfId;
382 }
383
384 /**
385 * Return the job Id.
386 *
387 * @return the job Id.
388 */
389 public String getWfId() {
390 return wfId;
391 }
392
393 /**
394 * Set the job id.
395 *
396 * @param id jobId;
397 */
398 public void setJobId(String id) {
399 this.wfId = id;
400 }
401
402 public String getSlaXml() {
403 return slaXml;
404 }
405
406 public void setSlaXml(String slaXml) {
407 this.slaXml = slaXml;
408 }
409
410 @Override
411 public void setStatus(Status val) {
412 this.status = val.toString();
413 super.setStatus(val);
414 }
415
416 public String getStatusStr() {
417 return status;
418 }
419
420 @Override
421 public Status getStatus() {
422 return Status.valueOf(this.status);
423 }
424
425 /**
426 * Return the node execution path.
427 *
428 * @return the node execution path.
429 */
430 public String getExecutionPath() {
431 return executionPath;
432 }
433
434 /**
435 * Set the node execution path.
436 *
437 * @param executionPath the node execution path.
438 */
439 public void setExecutionPath(String executionPath) {
440 this.executionPath = executionPath;
441 }
442
443 /**
444 * Return the signal value for the action. <p/> For decision nodes it is the choosen transition, for actions it is
445 * OK or ERROR.
446 *
447 * @return the action signal value.
448 */
449 public String getSignalValue() {
450 return signalValue;
451 }
452
453 /**
454 * Set the signal value for the action. <p/> For decision nodes it is the choosen transition, for actions it is OK
455 * or ERROR.
456 *
457 * @param signalValue the action signal value.
458 */
459 public void setSignalValue(String signalValue) {
460 this.signalValue = signalValue;
461 }
462
463 /**
464 * Return the job log token.
465 *
466 * @return the job log token.
467 */
468 public String getLogToken() {
469 return logToken;
470 }
471
472 /**
473 * Set the job log token.
474 *
475 * @param logToken the job log token.
476 */
477 public void setLogToken(String logToken) {
478 this.logToken = logToken;
479 }
480
481 /**
482 * Return the action last check time
483 *
484 * @return the last check time
485 */
486 public Date getLastCheckTime() {
487 return DateUtils.toDate(lastCheckTimestamp);
488 }
489
490 /**
491 * Return the action last check time
492 *
493 * @return the last check time
494 */
495 public Timestamp getLastCheckTimestamp() {
496 return lastCheckTimestamp;
497 }
498
499 /**
500 * Return the action last check time
501 *
502 * @return the last check time
503 */
504 public Timestamp getStartTimestamp() {
505 return startTimestamp;
506 }
507
508 /**
509 * Return the action last check time
510 *
511 * @return the last check time
512 */
513 public Timestamp getEndTimestamp() {
514 return endTimestamp;
515 }
516
517
518 /**
519 * Return the action last check time
520 *
521 * @return the last check time
522 */
523 public Timestamp getPendingAgeTimestamp() {
524 return pendingAgeTimestamp;
525 }
526
527 /**
528 * Sets the action last check time
529 *
530 * @param lastCheckTime the last check time to set.
531 */
532 public void setLastCheckTime(Date lastCheckTime) {
533 this.lastCheckTimestamp = DateUtils.convertDateToTimestamp(lastCheckTime);
534 }
535
536 public boolean getPending() {
537 return this.pending == 1 ? true : false;
538 }
539
540 @Override
541 public Date getStartTime() {
542 return DateUtils.toDate(startTimestamp);
543 }
544
545 @Override
546 public void setStartTime(Date startTime) {
547 super.setStartTime(startTime);
548 this.startTimestamp = DateUtils.convertDateToTimestamp(startTime);
549 }
550
551 @Override
552 public Date getEndTime() {
553 return DateUtils.toDate(endTimestamp);
554 }
555
556 @Override
557 public void setEndTime(Date endTime) {
558 super.setEndTime(endTime);
559 this.endTimestamp = DateUtils.convertDateToTimestamp(endTime);
560 }
561
562 }