001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie;
019    
020    import java.io.DataInput;
021    import java.io.DataOutput;
022    import java.io.IOException;
023    import java.sql.Timestamp;
024    import java.util.Date;
025    import java.util.Properties;
026    
027    import javax.persistence.Basic;
028    import javax.persistence.Column;
029    import javax.persistence.Entity;
030    import javax.persistence.Lob;
031    import javax.persistence.NamedQueries;
032    import javax.persistence.NamedQuery;
033    import javax.persistence.Transient;
034    
035    import org.apache.hadoop.io.Writable;
036    import org.apache.oozie.client.WorkflowAction;
037    import org.apache.oozie.client.rest.JsonWorkflowAction;
038    import org.apache.oozie.util.DateUtils;
039    import org.apache.oozie.util.ParamChecker;
040    import org.apache.oozie.util.PropertiesUtils;
041    import org.apache.oozie.util.WritableUtils;
042    import org.apache.openjpa.persistence.jdbc.Index;
043    
044    /**
045     * Bean that contains all the information to start an action for a workflow node.
046     */
047    @Entity
048    @NamedQueries({
049    
050        @NamedQuery(name = "UPDATE_ACTION", query = "update WorkflowActionBean a set a.conf = :conf, a.consoleUrl = :consoleUrl, a.data = :data, a.stats = :stats, a.externalChildIDs = :externalChildIDs, a.errorCode = :errorCode, a.errorMessage = :errorMessage, a.externalId = :externalId, a.externalStatus = :externalStatus, a.name = :name, a.cred = :cred , a.retries = :retries, a.trackerUri = :trackerUri, a.transition = :transition, a.type = :type, a.endTimestamp = :endTime, a.executionPath = :executionPath, a.lastCheckTimestamp = :lastCheckTime, a.logToken = :logToken, a.pending = :pending, a.pendingAgeTimestamp = :pendingAge, a.signalValue = :signalValue, a.slaXml = :slaXml, a.startTimestamp = :startTime, a.status = :status, a.wfId=:wfId where a.id = :id"),
051    
052        @NamedQuery(name = "DELETE_ACTION", query = "delete from WorkflowActionBean a where a.id = :id"),
053    
054        @NamedQuery(name = "DELETE_ACTIONS_FOR_WORKFLOW", query = "delete from WorkflowActionBean a where a.wfId = :wfId"),
055    
056        @NamedQuery(name = "GET_ACTIONS", query = "select OBJECT(a) from WorkflowActionBean a"),
057    
058        @NamedQuery(name = "GET_ACTION", query = "select OBJECT(a) from WorkflowActionBean a where a.id = :id"),
059    
060        @NamedQuery(name = "GET_ACTION_FOR_UPDATE", query = "select OBJECT(a) from WorkflowActionBean a where a.id = :id"),
061    
062        @NamedQuery(name = "GET_ACTION_FOR_SLA", query = "select a.id, a.status, a.startTimestamp, a.endTimestamp from WorkflowActionBean a where a.id = :id"),
063    
064        @NamedQuery(name = "GET_ACTIONS_FOR_WORKFLOW", query = "select OBJECT(a) from WorkflowActionBean a where a.wfId = :wfId order by a.startTimestamp"),
065    
066        @NamedQuery(name = "GET_ACTIONS_OF_WORKFLOW_FOR_UPDATE", query = "select OBJECT(a) from WorkflowActionBean a where a.wfId = :wfId order by a.startTimestamp"),
067    
068        @NamedQuery(name = "GET_PENDING_ACTIONS", query = "select OBJECT(a) from WorkflowActionBean a where a.pending = 1 AND a.pendingAgeTimestamp < :pendingAge AND a.status <> 'RUNNING'"),
069    
070        @NamedQuery(name = "GET_RUNNING_ACTIONS", query = "select OBJECT(a) from WorkflowActionBean a where a.pending = 1 AND a.status = 'RUNNING' AND a.lastCheckTimestamp < :lastCheckTime"),
071    
072        @NamedQuery(name = "GET_RETRY_MANUAL_ACTIONS", query = "select OBJECT(a) from WorkflowActionBean a where a.wfId = :wfId AND (a.status = 'START_RETRY' OR a.status = 'START_MANUAL' OR a.status = 'END_RETRY' OR a.status = 'END_MANUAL')") })
073    
074    public class WorkflowActionBean extends JsonWorkflowAction implements Writable {
075    
076        @Basic
077        @Index
078        @Column(name = "wf_id")
079        private String wfId = null;
080    
081        @Basic
082        @Index
083        @Column(name = "status")
084        private String status = WorkflowAction.Status.PREP.toString();
085    
086        @Basic
087        @Column(name = "last_check_time")
088        private java.sql.Timestamp lastCheckTimestamp;
089    
090        @Basic
091        @Column(name = "end_time")
092        private java.sql.Timestamp endTimestamp = null;
093    
094        @Basic
095        @Column(name = "start_time")
096        private java.sql.Timestamp startTimestamp = null;
097    
098        @Basic
099        @Column(name = "execution_path", length = 1024)
100        private String executionPath = null;
101    
102        @Basic
103        @Column(name = "pending")
104        private int pending = 0;
105    
106        // @Temporal(TemporalType.TIME)
107        // @Column(name="pending_age",columnDefinition="timestamp default '0000-00-00 00:00:00'")
108        @Basic
109        @Index
110        @Column(name = "pending_age")
111        private java.sql.Timestamp pendingAgeTimestamp = null;
112    
113        @Basic
114        @Column(name = "signal_value")
115        private String signalValue = null;
116    
117        @Basic
118        @Column(name = "log_token")
119        private String logToken = null;
120    
121        @Transient
122        private Date pendingAge;
123    
124        @Column(name = "sla_xml")
125        @Lob
126        private String slaXml = null;
127    
128        /**
129         * Default constructor.
130         */
131        public WorkflowActionBean() {
132        }
133    
134        /**
135         * Serialize the action bean to a data output.
136         *
137         * @param dataOutput data output.
138         * @throws IOException thrown if the action bean could not be serialized.
139         */
140    
141        public void write(DataOutput dataOutput) throws IOException {
142            WritableUtils.writeStr(dataOutput, getId());
143            WritableUtils.writeStr(dataOutput, getName());
144            WritableUtils.writeStr(dataOutput, getCred());
145            WritableUtils.writeStr(dataOutput, getType());
146            WritableUtils.writeStr(dataOutput, getConf());
147            WritableUtils.writeStr(dataOutput, getStatusStr());
148            dataOutput.writeInt(getRetries());
149            dataOutput.writeLong((getStartTime() != null) ? getStartTime().getTime() : -1);
150            dataOutput.writeLong((getEndTime() != null) ? getEndTime().getTime() : -1);
151            dataOutput.writeLong((getLastCheckTime() != null) ? getLastCheckTime().getTime() : -1);
152            WritableUtils.writeStr(dataOutput, getTransition());
153            WritableUtils.writeStr(dataOutput, getData());
154            WritableUtils.writeStr(dataOutput, getStats());
155            WritableUtils.writeStr(dataOutput, getExternalChildIDs());
156            WritableUtils.writeStr(dataOutput, getExternalId());
157            WritableUtils.writeStr(dataOutput, getExternalStatus());
158            WritableUtils.writeStr(dataOutput, getTrackerUri());
159            WritableUtils.writeStr(dataOutput, getConsoleUrl());
160            WritableUtils.writeStr(dataOutput, getErrorCode());
161            WritableUtils.writeStr(dataOutput, getErrorMessage());
162            WritableUtils.writeStr(dataOutput, wfId);
163            WritableUtils.writeStr(dataOutput, executionPath);
164            dataOutput.writeInt(pending);
165            dataOutput.writeLong((pendingAge != null) ? pendingAge.getTime() : -1);
166            WritableUtils.writeStr(dataOutput, signalValue);
167            WritableUtils.writeStr(dataOutput, logToken);
168            dataOutput.writeInt(getUserRetryCount());
169            dataOutput.writeInt(getUserRetryInterval());
170            dataOutput.writeInt(getUserRetryMax());
171        }
172    
173        /**
174         * Deserialize an action bean from a data input.
175         *
176         * @param dataInput data input.
177         * @throws IOException thrown if the action bean could not be deserialized.
178         */
179        public void readFields(DataInput dataInput) throws IOException {
180            setId(WritableUtils.readStr(dataInput));
181            setName(WritableUtils.readStr(dataInput));
182            setCred(WritableUtils.readStr(dataInput));
183            setType(WritableUtils.readStr(dataInput));
184            setConf(WritableUtils.readStr(dataInput));
185            setStatus(WorkflowAction.Status.valueOf(WritableUtils.readStr(dataInput)));
186            setRetries(dataInput.readInt());
187            long d = dataInput.readLong();
188            if (d != -1) {
189                setStartTime(new Date(d));
190            }
191            d = dataInput.readLong();
192            if (d != -1) {
193                setEndTime(new Date(d));
194            }
195            d = dataInput.readLong();
196            if (d != -1) {
197                setLastCheckTime(new Date(d));
198            }
199            setTransition(WritableUtils.readStr(dataInput));
200            setData(WritableUtils.readStr(dataInput));
201            setStats(WritableUtils.readStr(dataInput));
202            setExternalChildIDs(WritableUtils.readStr(dataInput));
203            setExternalId(WritableUtils.readStr(dataInput));
204            setExternalStatus(WritableUtils.readStr(dataInput));
205            setTrackerUri(WritableUtils.readStr(dataInput));
206            setConsoleUrl(WritableUtils.readStr(dataInput));
207            setErrorInfo(WritableUtils.readStr(dataInput), WritableUtils.readStr(dataInput));
208            wfId = WritableUtils.readStr(dataInput);
209            executionPath = WritableUtils.readStr(dataInput);
210            pending = dataInput.readInt();
211            d = dataInput.readLong();
212            if (d != -1) {
213                pendingAge = new Date(d);
214                pendingAgeTimestamp = DateUtils.convertDateToTimestamp(pendingAge);
215            }
216            signalValue = WritableUtils.readStr(dataInput);
217            logToken = WritableUtils.readStr(dataInput);
218            setUserRetryCount(dataInput.readInt());
219            setUserRetryInterval(dataInput.readInt());
220            setUserRetryMax(dataInput.readInt());
221        }
222    
223        /**
224         * Return whether workflow action in terminal state or not
225         *
226         * @return
227         */
228        public boolean inTerminalState() {
229            boolean isTerminalState = false;
230            switch (WorkflowAction.Status.valueOf(status)) {
231                case ERROR:
232                case FAILED:
233                case KILLED:
234                case OK:
235                    isTerminalState = true;
236                    break;
237                default:
238                    break;
239            }
240            return isTerminalState;
241        }
242    
243        /**
244         * Return if the action execution is complete.
245         *
246         * @return if the action start is complete.
247         */
248        public boolean isExecutionComplete() {
249            return getStatus() == WorkflowAction.Status.DONE;
250        }
251    
252        /**
253         * Return if the action is START_RETRY or START_MANUAL or END_RETRY or
254         * END_MANUAL.
255         *
256         * @return boolean true if status is START_RETRY or START_MANUAL or END_RETRY or
257         *         END_MANUAL
258         */
259        public boolean isRetryOrManual() {
260            return (getStatus() == WorkflowAction.Status.START_RETRY || getStatus() == WorkflowAction.Status.START_MANUAL
261                    || getStatus() == WorkflowAction.Status.END_RETRY || getStatus() == WorkflowAction.Status.END_MANUAL);
262        }
263    
264        /**
265         * Return true if the action is USER_RETRY
266         *
267         * @return boolean true if status is USER_RETRY
268         */
269        public boolean isUserRetry() {
270            return (getStatus() == WorkflowAction.Status.USER_RETRY);
271        }
272    
273        /**
274         * Return if the action is complete.
275         *
276         * @return if the action is complete.
277         */
278        public boolean isComplete() {
279            return getStatus() == WorkflowAction.Status.OK || getStatus() == WorkflowAction.Status.KILLED ||
280                    getStatus() == WorkflowAction.Status.ERROR;
281        }
282    
283        /**
284         * Return if the action is complete with failure.
285         *
286         * @return if the action is complete with failure.
287         */
288        public boolean isTerminalWithFailure() {
289            boolean result = false;
290            switch (getStatus()) {
291                case FAILED:
292                case KILLED:
293                case ERROR:
294                    result = true;
295            }
296            return result;
297        }
298    
299        /**
300         * Set the action pending flag to true.
301         */
302        public void setPendingOnly() {
303            pending = 1;
304        }
305    
306        /**
307         * Set the action as pending and the current time as pending.
308         */
309        public void setPending() {
310            pending = 1;
311            pendingAge = new Date();
312            pendingAgeTimestamp = DateUtils.convertDateToTimestamp(pendingAge);
313        }
314    
315        /**
316         * Set a time when the action will be pending, normally a time in the future.
317         *
318         * @param pendingAge the time when the action will be pending.
319         */
320        public void setPendingAge(Date pendingAge) {
321            this.pendingAge = pendingAge;
322            this.pendingAgeTimestamp = DateUtils.convertDateToTimestamp(pendingAge);
323        }
324    
325        /**
326         * Return the pending age of the action.
327         *
328         * @return the pending age of the action, <code>null</code> if the action is not pending.
329         */
330        public Date getPendingAge() {
331            return DateUtils.toDate(pendingAgeTimestamp);
332        }
333    
334        /**
335         * Return if the action is pending.
336         *
337         * @return if the action is pending.
338         */
339        public boolean isPending() {
340            return pending == 1 ? true : false;
341        }
342    
343        /**
344         * Removes the pending flag and pendingAge from the action.
345         */
346        public void resetPending() {
347            pending = 0;
348            pendingAge = null;
349            pendingAgeTimestamp = null;
350        }
351    
352        /**
353         * Removes the pending flag from the action.
354         */
355        public void resetPendingOnly() {
356            pending = 0;
357        }
358    
359        /**
360         * Increments the number of retries for the action.
361         */
362        public void incRetries() {
363            setRetries(getRetries() + 1);
364        }
365    
366        /**
367         * Set a tracking information for an action, and set the action status to {@link Action.Status#DONE}
368         *
369         * @param externalId external ID for the action.
370         * @param trackerUri tracker URI for the action.
371         * @param consoleUrl console URL for the action.
372         */
373        public void setStartData(String externalId, String trackerUri, String consoleUrl) {
374            setExternalId(ParamChecker.notEmpty(externalId, "externalId"));
375            setTrackerUri(ParamChecker.notEmpty(trackerUri, "trackerUri"));
376            setConsoleUrl(ParamChecker.notEmpty(consoleUrl, "consoleUrl"));
377            Date now = new Date();
378            if (this.startTimestamp == null) {
379                setStartTime(now);
380            }
381            setLastCheckTime(now);
382            setStatus(Status.RUNNING);
383        }
384    
385        /**
386         * Set the completion information for an action start. Sets the Action status to {@link Action.Status#DONE}
387         *
388         * @param externalStatus action external end status.
389         * @param actionData action output data, <code>null</code> if there is no action output data.
390         */
391        public void setExecutionData(String externalStatus, Properties actionData) {
392            setStatus(Status.DONE);
393            setExternalStatus(ParamChecker.notEmpty(externalStatus, "externalStatus"));
394            if (actionData != null) {
395                setData(PropertiesUtils.propertiesToString(actionData));
396            }
397        }
398    
399        /**
400         * Return the action statistics info.
401         *
402         * @return Json representation of the stats.
403         */
404        public String getExecutionStats() {
405            return getStats();
406        }
407    
408        /**
409         * Set the action statistics info for the workflow action.
410         *
411         * @param Json representation of the stats.
412         */
413        public void setExecutionStats(String jsonStats) {
414            setStats(jsonStats);
415        }
416    
417        /**
418         * Return the external child IDs.
419         *
420         * @return externalChildIDs as a string.
421         */
422        public String getExternalChildIDs() {
423            return super.getExternalChildIDs();
424        }
425    
426        /**
427         * Set the external child IDs for the workflow action.
428         *
429         * @param externalChildIDs as a string.
430         */
431        public void setExternalChildIDs(String externalChildIDs) {
432            super.setExternalChildIDs(externalChildIDs);
433        }
434    
435        /**
436         * Set the completion information for an action end.
437         *
438         * @param status action status, {@link Action.Status#OK} or {@link Action.Status#ERROR} or {@link
439         * Action.Status#KILLED}
440         * @param signalValue the signal value. In most cases, the value should be OK or ERROR.
441         */
442        public void setEndData(Status status, String signalValue) {
443            if (status == null || (status != Status.OK && status != Status.ERROR && status != Status.KILLED)) {
444                throw new IllegalArgumentException("Action status must be OK, ERROR or KILLED. Received ["
445                        + status.toString() + "]");
446            }
447            if (status == Status.OK) {
448                setErrorInfo(null, null);
449            }
450            setStatus(status);
451            setSignalValue(ParamChecker.notEmpty(signalValue, "signalValue"));
452        }
453    
454    
455        /**
456         * Return the job Id.
457         *
458         * @return the job Id.
459         */
460        public String getJobId() {
461            return wfId;
462        }
463    
464        /**
465         * Return the job Id.
466         *
467         * @return the job Id.
468         */
469        public String getWfId() {
470            return wfId;
471        }
472    
473        /**
474         * Set the job id.
475         *
476         * @param id jobId;
477         */
478        public void setJobId(String id) {
479            this.wfId = id;
480        }
481    
482        public String getSlaXml() {
483            return slaXml;
484        }
485    
486        public void setSlaXml(String slaXml) {
487            this.slaXml = slaXml;
488        }
489    
490        @Override
491        public void setStatus(Status val) {
492            this.status = val.toString();
493            super.setStatus(val);
494        }
495    
496        public String getStatusStr() {
497            return status;
498        }
499    
500        @Override
501        public Status getStatus() {
502            return Status.valueOf(this.status);
503        }
504    
505        /**
506         * Return the node execution path.
507         *
508         * @return the node execution path.
509         */
510        public String getExecutionPath() {
511            return executionPath;
512        }
513    
514        /**
515         * Set the node execution path.
516         *
517         * @param executionPath the node execution path.
518         */
519        public void setExecutionPath(String executionPath) {
520            this.executionPath = executionPath;
521        }
522    
523        /**
524         * Return the signal value for the action. <p/> For decision nodes it is the choosen transition, for actions it is
525         * OK or ERROR.
526         *
527         * @return the action signal value.
528         */
529        public String getSignalValue() {
530            return signalValue;
531        }
532    
533        /**
534         * Set the signal value for the action. <p/> For decision nodes it is the choosen transition, for actions it is OK
535         * or ERROR.
536         *
537         * @param signalValue the action signal value.
538         */
539        public void setSignalValue(String signalValue) {
540            this.signalValue = signalValue;
541        }
542    
543        /**
544         * Return the job log token.
545         *
546         * @return the job log token.
547         */
548        public String getLogToken() {
549            return logToken;
550        }
551    
552        /**
553         * Set the job log token.
554         *
555         * @param logToken the job log token.
556         */
557        public void setLogToken(String logToken) {
558            this.logToken = logToken;
559        }
560    
561        /**
562         * Return the action last check time
563         *
564         * @return the last check time
565         */
566        public Date getLastCheckTime() {
567            return DateUtils.toDate(lastCheckTimestamp);
568        }
569    
570        /**
571         * Return the action last check time
572         *
573         * @return the last check time
574         */
575        public Timestamp getLastCheckTimestamp() {
576            return lastCheckTimestamp;
577        }
578    
579        /**
580         * Return the action last check time
581         *
582         * @return the last check time
583         */
584        public Timestamp getStartTimestamp() {
585            return startTimestamp;
586        }
587    
588        /**
589         * Return the action last check time
590         *
591         * @return the last check time
592         */
593        public Timestamp getEndTimestamp() {
594            return endTimestamp;
595        }
596    
597    
598        /**
599         * Return the action last check time
600         *
601         * @return the last check time
602         */
603        public Timestamp getPendingAgeTimestamp() {
604            return pendingAgeTimestamp;
605        }
606    
607        /**
608         * Sets the action last check time
609         *
610         * @param lastCheckTime the last check time to set.
611         */
612        public void setLastCheckTime(Date lastCheckTime) {
613            this.lastCheckTimestamp = DateUtils.convertDateToTimestamp(lastCheckTime);
614        }
615    
616        public boolean getPending() {
617            return this.pending == 1 ? true : false;
618        }
619    
620        @Override
621        public Date getStartTime() {
622            return DateUtils.toDate(startTimestamp);
623        }
624    
625        @Override
626        public void setStartTime(Date startTime) {
627            super.setStartTime(startTime);
628            this.startTimestamp = DateUtils.convertDateToTimestamp(startTime);
629        }
630    
631        @Override
632        public Date getEndTime() {
633            return DateUtils.toDate(endTimestamp);
634        }
635    
636        @Override
637        public void setEndTime(Date endTime) {
638            super.setEndTime(endTime);
639            this.endTimestamp = DateUtils.convertDateToTimestamp(endTime);
640        }
641    
642    }