001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     * 
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     * 
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie;
019    
020    import java.io.DataInput;
021    import java.io.DataOutput;
022    import java.io.IOException;
023    import java.sql.Timestamp;
024    import java.util.Date;
025    import java.util.Properties;
026    
027    import javax.persistence.Basic;
028    import javax.persistence.Column;
029    import javax.persistence.Entity;
030    import javax.persistence.Lob;
031    import javax.persistence.NamedQueries;
032    import javax.persistence.NamedQuery;
033    import javax.persistence.Transient;
034    
035    import org.apache.hadoop.io.Writable;
036    import org.apache.oozie.client.WorkflowAction;
037    import org.apache.oozie.client.rest.JsonWorkflowAction;
038    import org.apache.oozie.util.DateUtils;
039    import org.apache.oozie.util.ParamChecker;
040    import org.apache.oozie.util.PropertiesUtils;
041    import org.apache.oozie.util.WritableUtils;
042    import org.apache.openjpa.persistence.jdbc.Index;
043    
044    /**
045     * Bean that contains all the information to start an action for a workflow node.
046     */
047    @Entity
048    @NamedQueries({
049    
050        @NamedQuery(name = "UPDATE_ACTION", query = "update WorkflowActionBean a set a.conf = :conf, a.consoleUrl = :consoleUrl, a.data = :data, a.stats = :stats, a.externalChildIDs = :externalChildIDs, a.errorCode = :errorCode, a.errorMessage = :errorMessage, a.externalId = :externalId, a.externalStatus = :externalStatus, a.name = :name, a.cred = :cred , a.retries = :retries, a.trackerUri = :trackerUri, a.transition = :transition, a.type = :type, a.endTimestamp = :endTime, a.executionPath = :executionPath, a.lastCheckTimestamp = :lastCheckTime, a.logToken = :logToken, a.pending = :pending, a.pendingAgeTimestamp = :pendingAge, a.signalValue = :signalValue, a.slaXml = :slaXml, a.startTimestamp = :startTime, a.status = :status, a.wfId=:wfId where a.id = :id"),
051    
052        @NamedQuery(name = "DELETE_ACTION", query = "delete from WorkflowActionBean a where a.id = :id"),
053    
054        @NamedQuery(name = "DELETE_ACTIONS_FOR_WORKFLOW", query = "delete from WorkflowActionBean a where a.wfId = :wfId"),
055    
056        @NamedQuery(name = "GET_ACTIONS", query = "select OBJECT(a) from WorkflowActionBean a"),
057    
058        @NamedQuery(name = "GET_ACTION", query = "select OBJECT(a) from WorkflowActionBean a where a.id = :id"),
059    
060        @NamedQuery(name = "GET_ACTION_FOR_UPDATE", query = "select OBJECT(a) from WorkflowActionBean a where a.id = :id"),
061    
062        @NamedQuery(name = "GET_ACTIONS_FOR_WORKFLOW", query = "select OBJECT(a) from WorkflowActionBean a where a.wfId = :wfId order by a.startTimestamp"),
063    
064        @NamedQuery(name = "GET_ACTIONS_OF_WORKFLOW_FOR_UPDATE", query = "select OBJECT(a) from WorkflowActionBean a where a.wfId = :wfId order by a.startTimestamp"),
065    
066        @NamedQuery(name = "GET_PENDING_ACTIONS", query = "select OBJECT(a) from WorkflowActionBean a where a.pending = 1 AND a.pendingAgeTimestamp < :pendingAge AND a.status <> 'RUNNING'"),
067    
068        @NamedQuery(name = "GET_RUNNING_ACTIONS", query = "select OBJECT(a) from WorkflowActionBean a where a.pending = 1 AND a.status = 'RUNNING' AND a.lastCheckTimestamp < :lastCheckTime"),
069    
070        @NamedQuery(name = "GET_RETRY_MANUAL_ACTIONS", query = "select OBJECT(a) from WorkflowActionBean a where a.wfId = :wfId AND (a.status = 'START_RETRY' OR a.status = 'START_MANUAL' OR a.status = 'END_RETRY' OR a.status = 'END_MANUAL')") })
071    
072    public class WorkflowActionBean extends JsonWorkflowAction implements Writable {
073    
074        @Basic
075        @Index
076        @Column(name = "wf_id")
077        private String wfId = null;
078    
079        @Basic
080        @Index
081        @Column(name = "status")
082        private String status = WorkflowAction.Status.PREP.toString();
083    
084        @Basic
085        @Column(name = "last_check_time")
086        private java.sql.Timestamp lastCheckTimestamp;
087    
088        @Basic
089        @Column(name = "end_time")
090        private java.sql.Timestamp endTimestamp = null;
091    
092        @Basic
093        @Column(name = "start_time")
094        private java.sql.Timestamp startTimestamp = null;
095    
096        @Basic
097        @Column(name = "execution_path", length = 1024)
098        private String executionPath = null;
099    
100        @Basic
101        @Column(name = "pending")
102        private int pending = 0;
103    
104        // @Temporal(TemporalType.TIME)
105        // @Column(name="pending_age",columnDefinition="timestamp default '0000-00-00 00:00:00'")
106        @Basic
107        @Index
108        @Column(name = "pending_age")
109        private java.sql.Timestamp pendingAgeTimestamp = null;
110    
111        @Basic
112        @Column(name = "signal_value")
113        private String signalValue = null;
114    
115        @Basic
116        @Column(name = "log_token")
117        private String logToken = null;
118    
119        @Transient
120        private Date pendingAge;
121    
122        @Column(name = "sla_xml")
123        @Lob
124        private String slaXml = null;
125    
126        /**
127         * Default constructor.
128         */
129        public WorkflowActionBean() {
130        }
131    
132        /**
133         * Serialize the action bean to a data output.
134         *
135         * @param dataOutput data output.
136         * @throws IOException thrown if the action bean could not be serialized.
137         */
138    
139        public void write(DataOutput dataOutput) throws IOException {
140            WritableUtils.writeStr(dataOutput, getId());
141            WritableUtils.writeStr(dataOutput, getName());
142            WritableUtils.writeStr(dataOutput, getCred());
143            WritableUtils.writeStr(dataOutput, getType());
144            WritableUtils.writeStr(dataOutput, getConf());
145            WritableUtils.writeStr(dataOutput, getStatusStr());
146            dataOutput.writeInt(getRetries());
147            dataOutput.writeLong((getStartTime() != null) ? getStartTime().getTime() : -1);
148            dataOutput.writeLong((getEndTime() != null) ? getEndTime().getTime() : -1);
149            dataOutput.writeLong((getLastCheckTime() != null) ? getLastCheckTime().getTime() : -1);
150            WritableUtils.writeStr(dataOutput, getTransition());
151            WritableUtils.writeStr(dataOutput, getData());
152            WritableUtils.writeStr(dataOutput, getStats());
153            WritableUtils.writeStr(dataOutput, getExternalChildIDs());
154            WritableUtils.writeStr(dataOutput, getExternalId());
155            WritableUtils.writeStr(dataOutput, getExternalStatus());
156            WritableUtils.writeStr(dataOutput, getTrackerUri());
157            WritableUtils.writeStr(dataOutput, getConsoleUrl());
158            WritableUtils.writeStr(dataOutput, getErrorCode());
159            WritableUtils.writeStr(dataOutput, getErrorMessage());
160            WritableUtils.writeStr(dataOutput, wfId);
161            WritableUtils.writeStr(dataOutput, executionPath);
162            dataOutput.writeInt(pending);
163            dataOutput.writeLong((pendingAge != null) ? pendingAge.getTime() : -1);
164            WritableUtils.writeStr(dataOutput, signalValue);
165            WritableUtils.writeStr(dataOutput, logToken);
166            dataOutput.writeInt(getUserRetryCount());
167            dataOutput.writeInt(getUserRetryInterval());
168            dataOutput.writeInt(getUserRetryMax());
169        }
170    
171        /**
172         * Deserialize an action bean from a data input.
173         *
174         * @param dataInput data input.
175         * @throws IOException thrown if the action bean could not be deserialized.
176         */
177        public void readFields(DataInput dataInput) throws IOException {
178            setId(WritableUtils.readStr(dataInput));
179            setName(WritableUtils.readStr(dataInput));
180            setCred(WritableUtils.readStr(dataInput));
181            setType(WritableUtils.readStr(dataInput));
182            setConf(WritableUtils.readStr(dataInput));
183            setStatus(WorkflowAction.Status.valueOf(WritableUtils.readStr(dataInput)));
184            setRetries(dataInput.readInt());
185            long d = dataInput.readLong();
186            if (d != -1) {
187                setStartTime(new Date(d));
188            }
189            d = dataInput.readLong();
190            if (d != -1) {
191                setEndTime(new Date(d));
192            }
193            d = dataInput.readLong();
194            if (d != -1) {
195                setLastCheckTime(new Date(d));
196            }
197            setTransition(WritableUtils.readStr(dataInput));
198            setData(WritableUtils.readStr(dataInput));
199            setStats(WritableUtils.readStr(dataInput));
200            setExternalChildIDs(WritableUtils.readStr(dataInput));
201            setExternalId(WritableUtils.readStr(dataInput));
202            setExternalStatus(WritableUtils.readStr(dataInput));
203            setTrackerUri(WritableUtils.readStr(dataInput));
204            setConsoleUrl(WritableUtils.readStr(dataInput));
205            setErrorInfo(WritableUtils.readStr(dataInput), WritableUtils.readStr(dataInput));
206            wfId = WritableUtils.readStr(dataInput);
207            executionPath = WritableUtils.readStr(dataInput);
208            pending = dataInput.readInt();
209            d = dataInput.readLong();
210            if (d != -1) {
211                pendingAge = new Date(d);
212                pendingAgeTimestamp = DateUtils.convertDateToTimestamp(pendingAge);
213            }
214            signalValue = WritableUtils.readStr(dataInput);
215            logToken = WritableUtils.readStr(dataInput);
216            setUserRetryCount(dataInput.readInt());
217            setUserRetryInterval(dataInput.readInt());
218            setUserRetryMax(dataInput.readInt());
219        }
220    
221        /**
222         * Return if the action execution is complete.
223         *
224         * @return if the action start is complete.
225         */
226        public boolean isExecutionComplete() {
227            return getStatus() == WorkflowAction.Status.DONE;
228        }
229    
230        /**
231         * Return if the action is START_RETRY or START_MANUAL or END_RETRY or
232         * END_MANUAL.
233         *
234         * @return boolean true if status is START_RETRY or START_MANUAL or END_RETRY or
235         *         END_MANUAL
236         */
237        public boolean isRetryOrManual() {
238            return (getStatus() == WorkflowAction.Status.START_RETRY || getStatus() == WorkflowAction.Status.START_MANUAL
239                    || getStatus() == WorkflowAction.Status.END_RETRY || getStatus() == WorkflowAction.Status.END_MANUAL);
240        }
241        
242        /**
243         * Return true if the action is USER_RETRY
244         *
245         * @return boolean true if status is USER_RETRY
246         */
247        public boolean isUserRetry() {
248            return (getStatus() == WorkflowAction.Status.USER_RETRY);
249        }
250    
251        /**
252         * Return if the action is complete.
253         *
254         * @return if the action is complete.
255         */
256        public boolean isComplete() {
257            return getStatus() == WorkflowAction.Status.OK || getStatus() == WorkflowAction.Status.KILLED ||
258                    getStatus() == WorkflowAction.Status.ERROR;
259        }
260    
261        /**
262         * Set the action pending flag to true.
263         */
264        public void setPendingOnly() {
265            pending = 1;
266        }
267    
268        /**
269         * Set the action as pending and the current time as pending.
270         */
271        public void setPending() {
272            pending = 1;
273            pendingAge = new Date();
274            pendingAgeTimestamp = DateUtils.convertDateToTimestamp(pendingAge);
275        }
276    
277        /**
278         * Set a time when the action will be pending, normally a time in the future.
279         *
280         * @param pendingAge the time when the action will be pending.
281         */
282        public void setPendingAge(Date pendingAge) {
283            this.pendingAge = pendingAge;
284            this.pendingAgeTimestamp = DateUtils.convertDateToTimestamp(pendingAge);
285        }
286    
287        /**
288         * Return the pending age of the action.
289         *
290         * @return the pending age of the action, <code>null</code> if the action is not pending.
291         */
292        public Date getPendingAge() {
293            return DateUtils.toDate(pendingAgeTimestamp);
294        }
295    
296        /**
297         * Return if the action is pending.
298         *
299         * @return if the action is pending.
300         */
301        public boolean isPending() {
302            return pending == 1 ? true : false;
303        }
304    
305        /**
306         * Removes the pending flag and pendingAge from the action.
307         */
308        public void resetPending() {
309            pending = 0;
310            pendingAge = null;
311            pendingAgeTimestamp = null;
312        }
313    
314        /**
315         * Removes the pending flag from the action.
316         */
317        public void resetPendingOnly() {
318            pending = 0;
319        }
320    
321        /**
322         * Increments the number of retries for the action.
323         */
324        public void incRetries() {
325            setRetries(getRetries() + 1);
326        }
327    
328        /**
329         * Set a tracking information for an action, and set the action status to {@link Action.Status#DONE}
330         *
331         * @param externalId external ID for the action.
332         * @param trackerUri tracker URI for the action.
333         * @param consoleUrl console URL for the action.
334         */
335        public void setStartData(String externalId, String trackerUri, String consoleUrl) {
336            setExternalId(ParamChecker.notEmpty(externalId, "externalId"));
337            setTrackerUri(ParamChecker.notEmpty(trackerUri, "trackerUri"));
338            setConsoleUrl(ParamChecker.notEmpty(consoleUrl, "consoleUrl"));
339            Date now = new Date();
340            if (this.startTimestamp == null) {
341                setStartTime(now);
342            }
343            setLastCheckTime(now);
344            setStatus(Status.RUNNING);
345        }
346    
347        /**
348         * Set the completion information for an action start. Sets the Action status to {@link Action.Status#DONE}
349         *
350         * @param externalStatus action external end status.
351         * @param actionData action output data, <code>null</code> if there is no action output data.
352         */
353        public void setExecutionData(String externalStatus, Properties actionData) {
354            setStatus(Status.DONE);
355            setExternalStatus(ParamChecker.notEmpty(externalStatus, "externalStatus"));
356            if (actionData != null) {
357                setData(PropertiesUtils.propertiesToString(actionData));
358            }
359        }
360    
361        /**
362         * Return the action statistics info.
363         *
364         * @return Json representation of the stats.
365         */
366        public String getExecutionStats() {
367            return getStats();
368        }
369    
370        /**
371         * Set the action statistics info for the workflow action.
372         *
373         * @param Json representation of the stats.
374         */
375        public void setExecutionStats(String jsonStats) {
376            setStats(jsonStats);
377        }
378    
379        /**
380         * Return the external child IDs.
381         *
382         * @return externalChildIDs as a string.
383         */
384        public String getExternalChildIDs() {
385            return super.getExternalChildIDs();
386        }
387    
388        /**
389         * Set the external child IDs for the workflow action.
390         *
391         * @param externalChildIDs as a string.
392         */
393        public void setExternalChildIDs(String externalChildIDs) {
394            super.setExternalChildIDs(externalChildIDs);
395        }
396    
397        /**
398         * Set the completion information for an action end.
399         *
400         * @param status action status, {@link Action.Status#OK} or {@link Action.Status#ERROR} or {@link
401         * Action.Status#KILLED}
402         * @param signalValue the signal value. In most cases, the value should be OK or ERROR.
403         */
404        public void setEndData(Status status, String signalValue) {
405            if (status == null || (status != Status.OK && status != Status.ERROR && status != Status.KILLED)) {
406                throw new IllegalArgumentException("Action status must be OK, ERROR or KILLED. Received ["
407                        + status.toString() + "]");
408            }
409            if (status == Status.OK) {
410                setErrorInfo(null, null);
411            }
412            setStatus(status);
413            setSignalValue(ParamChecker.notEmpty(signalValue, "signalValue"));
414        }
415    
416    
417        /**
418         * Return the job Id.
419         *
420         * @return the job Id.
421         */
422        public String getJobId() {
423            return wfId;
424        }
425    
426        /**
427         * Return the job Id.
428         *
429         * @return the job Id.
430         */
431        public String getWfId() {
432            return wfId;
433        }
434    
435        /**
436         * Set the job id.
437         *
438         * @param id jobId;
439         */
440        public void setJobId(String id) {
441            this.wfId = id;
442        }
443    
444        public String getSlaXml() {
445            return slaXml;
446        }
447    
448        public void setSlaXml(String slaXml) {
449            this.slaXml = slaXml;
450        }
451    
452        @Override
453        public void setStatus(Status val) {
454            this.status = val.toString();
455            super.setStatus(val);
456        }
457    
458        public String getStatusStr() {
459            return status;
460        }
461    
462        @Override
463        public Status getStatus() {
464            return Status.valueOf(this.status);
465        }
466    
467        /**
468         * Return the node execution path.
469         *
470         * @return the node execution path.
471         */
472        public String getExecutionPath() {
473            return executionPath;
474        }
475    
476        /**
477         * Set the node execution path.
478         *
479         * @param executionPath the node execution path.
480         */
481        public void setExecutionPath(String executionPath) {
482            this.executionPath = executionPath;
483        }
484    
485        /**
486         * Return the signal value for the action. <p/> For decision nodes it is the choosen transition, for actions it is
487         * OK or ERROR.
488         *
489         * @return the action signal value.
490         */
491        public String getSignalValue() {
492            return signalValue;
493        }
494    
495        /**
496         * Set the signal value for the action. <p/> For decision nodes it is the choosen transition, for actions it is OK
497         * or ERROR.
498         *
499         * @param signalValue the action signal value.
500         */
501        public void setSignalValue(String signalValue) {
502            this.signalValue = signalValue;
503        }
504    
505        /**
506         * Return the job log token.
507         *
508         * @return the job log token.
509         */
510        public String getLogToken() {
511            return logToken;
512        }
513    
514        /**
515         * Set the job log token.
516         *
517         * @param logToken the job log token.
518         */
519        public void setLogToken(String logToken) {
520            this.logToken = logToken;
521        }
522    
523        /**
524         * Return the action last check time
525         *
526         * @return the last check time
527         */
528        public Date getLastCheckTime() {
529            return DateUtils.toDate(lastCheckTimestamp);
530        }
531    
532        /**
533         * Return the action last check time
534         *
535         * @return the last check time
536         */
537        public Timestamp getLastCheckTimestamp() {
538            return lastCheckTimestamp;
539        }
540    
541        /**
542         * Return the action last check time
543         *
544         * @return the last check time
545         */
546        public Timestamp getStartTimestamp() {
547            return startTimestamp;
548        }
549    
550        /**
551         * Return the action last check time
552         *
553         * @return the last check time
554         */
555        public Timestamp getEndTimestamp() {
556            return endTimestamp;
557        }
558    
559    
560        /**
561         * Return the action last check time
562         *
563         * @return the last check time
564         */
565        public Timestamp getPendingAgeTimestamp() {
566            return pendingAgeTimestamp;
567        }
568    
569        /**
570         * Sets the action last check time
571         *
572         * @param lastCheckTime the last check time to set.
573         */
574        public void setLastCheckTime(Date lastCheckTime) {
575            this.lastCheckTimestamp = DateUtils.convertDateToTimestamp(lastCheckTime);
576        }
577    
578        public boolean getPending() {
579            return this.pending == 1 ? true : false;
580        }
581    
582        @Override
583        public Date getStartTime() {
584            return DateUtils.toDate(startTimestamp);
585        }
586    
587        @Override
588        public void setStartTime(Date startTime) {
589            super.setStartTime(startTime);
590            this.startTimestamp = DateUtils.convertDateToTimestamp(startTime);
591        }
592    
593        @Override
594        public Date getEndTime() {
595            return DateUtils.toDate(endTimestamp);
596        }
597    
598        @Override
599        public void setEndTime(Date endTime) {
600            super.setEndTime(endTime);
601            this.endTimestamp = DateUtils.convertDateToTimestamp(endTime);
602        }
603    
604    }