001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     * 
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     * 
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie;
019    
020    import java.io.DataInput;
021    import java.io.DataOutput;
022    import java.io.IOException;
023    import java.sql.Timestamp;
024    import java.util.Date;
025    import java.util.Properties;
026    
027    import javax.persistence.Basic;
028    import javax.persistence.Column;
029    import javax.persistence.Entity;
030    import javax.persistence.Lob;
031    import javax.persistence.NamedQueries;
032    import javax.persistence.NamedQuery;
033    import javax.persistence.Transient;
034    
035    import org.apache.hadoop.io.Writable;
036    import org.apache.oozie.client.WorkflowAction;
037    import org.apache.oozie.client.rest.JsonWorkflowAction;
038    import org.apache.oozie.util.DateUtils;
039    import org.apache.oozie.util.ParamChecker;
040    import org.apache.oozie.util.PropertiesUtils;
041    import org.apache.oozie.util.WritableUtils;
042    import org.apache.openjpa.persistence.jdbc.Index;
043    
044    /**
045     * Bean that contains all the information to start an action for a workflow node.
046     */
047    @Entity
048    @NamedQueries({
049    
050        @NamedQuery(name = "UPDATE_ACTION", query = "update WorkflowActionBean a set a.conf = :conf, a.consoleUrl = :consoleUrl, a.data = :data, a.errorCode = :errorCode, a.errorMessage = :errorMessage, a.externalId = :externalId, a.externalStatus = :externalStatus, a.name = :name, a.cred = :cred , a.retries = :retries, a.trackerUri = :trackerUri, a.transition = :transition, a.type = :type, a.endTimestamp = :endTime, a.executionPath = :executionPath, a.lastCheckTimestamp = :lastCheckTime, a.logToken = :logToken, a.pending = :pending, a.pendingAgeTimestamp = :pendingAge, a.signalValue = :signalValue, a.slaXml = :slaXml, a.startTimestamp = :startTime, a.status = :status, a.wfId=:wfId where a.id = :id"),
051    
052        @NamedQuery(name = "DELETE_ACTION", query = "delete from WorkflowActionBean a where a.id = :id"),
053    
054        @NamedQuery(name = "DELETE_ACTIONS_FOR_WORKFLOW", query = "delete from WorkflowActionBean a where a.wfId = :wfId"),
055    
056        @NamedQuery(name = "GET_ACTIONS", query = "select OBJECT(a) from WorkflowActionBean a"),
057    
058        @NamedQuery(name = "GET_ACTION", query = "select OBJECT(a) from WorkflowActionBean a where a.id = :id"),
059    
060        @NamedQuery(name = "GET_ACTION_FOR_UPDATE", query = "select OBJECT(a) from WorkflowActionBean a where a.id = :id"),
061    
062        @NamedQuery(name = "GET_ACTIONS_FOR_WORKFLOW", query = "select OBJECT(a) from WorkflowActionBean a where a.wfId = :wfId order by a.startTimestamp"),
063    
064        @NamedQuery(name = "GET_ACTIONS_OF_WORKFLOW_FOR_UPDATE", query = "select OBJECT(a) from WorkflowActionBean a where a.wfId = :wfId order by a.startTimestamp"),
065    
066        @NamedQuery(name = "GET_PENDING_ACTIONS", query = "select OBJECT(a) from WorkflowActionBean a where a.pending = 1 AND a.pendingAgeTimestamp < :pendingAge AND a.status <> 'RUNNING'"),
067    
068        @NamedQuery(name = "GET_RUNNING_ACTIONS", query = "select OBJECT(a) from WorkflowActionBean a where a.pending = 1 AND a.status = 'RUNNING' AND a.lastCheckTimestamp < :lastCheckTime"),
069    
070        @NamedQuery(name = "GET_RETRY_MANUAL_ACTIONS", query = "select OBJECT(a) from WorkflowActionBean a where a.wfId = :wfId AND (a.status = 'START_RETRY' OR a.status = 'START_MANUAL' OR a.status = 'END_RETRY' OR a.status = 'END_MANUAL')") })
071    
072    public class WorkflowActionBean extends JsonWorkflowAction implements Writable {
073    
074        @Basic
075        @Index
076        @Column(name = "wf_id")
077        private String wfId = null;
078    
079        @Basic
080        @Index
081        @Column(name = "status")
082        private String status = WorkflowAction.Status.PREP.toString();
083    
084        @Basic
085        @Column(name = "last_check_time")
086        private java.sql.Timestamp lastCheckTimestamp;
087    
088        @Basic
089        @Column(name = "end_time")
090        private java.sql.Timestamp endTimestamp = null;
091    
092        @Basic
093        @Column(name = "start_time")
094        private java.sql.Timestamp startTimestamp = null;
095    
096        @Basic
097        @Column(name = "execution_path", length = 1024)
098        private String executionPath = null;
099    
100        @Basic
101        @Column(name = "pending")
102        private int pending = 0;
103    
104        // @Temporal(TemporalType.TIME)
105        // @Column(name="pending_age",columnDefinition="timestamp default '0000-00-00 00:00:00'")
106        @Basic
107        @Index
108        @Column(name = "pending_age")
109        private java.sql.Timestamp pendingAgeTimestamp = null;
110    
111        @Basic
112        @Column(name = "signal_value")
113        private String signalValue = null;
114    
115        @Basic
116        @Column(name = "log_token")
117        private String logToken = null;
118    
119        @Transient
120        private Date pendingAge;
121    
122        @Column(name = "sla_xml")
123        @Lob
124        private String slaXml = null;
125    
126        /**
127         * Default constructor.
128         */
129        public WorkflowActionBean() {
130        }
131    
132        /**
133         * Serialize the action bean to a data output.
134         *
135         * @param dataOutput data output.
136         * @throws IOException thrown if the action bean could not be serialized.
137         */
138    
139        public void write(DataOutput dataOutput) throws IOException {
140            WritableUtils.writeStr(dataOutput, getId());
141            WritableUtils.writeStr(dataOutput, getName());
142            WritableUtils.writeStr(dataOutput, getCred());
143            WritableUtils.writeStr(dataOutput, getType());
144            WritableUtils.writeStr(dataOutput, getConf());
145            WritableUtils.writeStr(dataOutput, getStatusStr());
146            dataOutput.writeInt(getRetries());
147            dataOutput.writeLong((getStartTime() != null) ? getStartTime().getTime() : -1);
148            dataOutput.writeLong((getEndTime() != null) ? getEndTime().getTime() : -1);
149            dataOutput.writeLong((getLastCheckTime() != null) ? getLastCheckTime().getTime() : -1);
150            WritableUtils.writeStr(dataOutput, getTransition());
151            WritableUtils.writeStr(dataOutput, getData());
152            WritableUtils.writeStr(dataOutput, getExternalId());
153            WritableUtils.writeStr(dataOutput, getExternalStatus());
154            WritableUtils.writeStr(dataOutput, getTrackerUri());
155            WritableUtils.writeStr(dataOutput, getConsoleUrl());
156            WritableUtils.writeStr(dataOutput, getErrorCode());
157            WritableUtils.writeStr(dataOutput, getErrorMessage());
158            WritableUtils.writeStr(dataOutput, wfId);
159            WritableUtils.writeStr(dataOutput, executionPath);
160            dataOutput.writeInt(pending);
161            dataOutput.writeLong((pendingAge != null) ? pendingAge.getTime() : -1);
162            WritableUtils.writeStr(dataOutput, signalValue);
163            WritableUtils.writeStr(dataOutput, logToken);
164            dataOutput.writeInt(getUserRetryCount());
165            dataOutput.writeInt(getUserRetryInterval());
166            dataOutput.writeInt(getUserRetryMax());
167        }
168    
169        /**
170         * Deserialize an action bean from a data input.
171         *
172         * @param dataInput data input.
173         * @throws IOException thrown if the action bean could not be deserialized.
174         */
175        public void readFields(DataInput dataInput) throws IOException {
176            setId(WritableUtils.readStr(dataInput));
177            setName(WritableUtils.readStr(dataInput));
178            setCred(WritableUtils.readStr(dataInput));
179            setType(WritableUtils.readStr(dataInput));
180            setConf(WritableUtils.readStr(dataInput));
181            setStatus(WorkflowAction.Status.valueOf(WritableUtils.readStr(dataInput)));
182            setRetries(dataInput.readInt());
183            long d = dataInput.readLong();
184            if (d != -1) {
185                setStartTime(new Date(d));
186            }
187            d = dataInput.readLong();
188            if (d != -1) {
189                setEndTime(new Date(d));
190            }
191            d = dataInput.readLong();
192            if (d != -1) {
193                setLastCheckTime(new Date(d));
194            }
195            setTransition(WritableUtils.readStr(dataInput));
196            setData(WritableUtils.readStr(dataInput));
197            setExternalId(WritableUtils.readStr(dataInput));
198            setExternalStatus(WritableUtils.readStr(dataInput));
199            setTrackerUri(WritableUtils.readStr(dataInput));
200            setConsoleUrl(WritableUtils.readStr(dataInput));
201            setErrorInfo(WritableUtils.readStr(dataInput), WritableUtils.readStr(dataInput));
202            wfId = WritableUtils.readStr(dataInput);
203            executionPath = WritableUtils.readStr(dataInput);
204            pending = dataInput.readInt();
205            d = dataInput.readLong();
206            if (d != -1) {
207                pendingAge = new Date(d);
208                pendingAgeTimestamp = DateUtils.convertDateToTimestamp(pendingAge);
209            }
210            signalValue = WritableUtils.readStr(dataInput);
211            logToken = WritableUtils.readStr(dataInput);
212            setUserRetryCount(dataInput.readInt());
213            setUserRetryInterval(dataInput.readInt());
214            setUserRetryMax(dataInput.readInt());
215        }
216    
217        /**
218         * Return if the action execution is complete.
219         *
220         * @return if the action start is complete.
221         */
222        public boolean isExecutionComplete() {
223            return getStatus() == WorkflowAction.Status.DONE;
224        }
225    
226        /**
227         * Return if the action is START_RETRY or START_MANUAL or END_RETRY or
228         * END_MANUAL.
229         *
230         * @return boolean true if status is START_RETRY or START_MANUAL or END_RETRY or
231         *         END_MANUAL
232         */
233        public boolean isRetryOrManual() {
234            return (getStatus() == WorkflowAction.Status.START_RETRY || getStatus() == WorkflowAction.Status.START_MANUAL
235                    || getStatus() == WorkflowAction.Status.END_RETRY || getStatus() == WorkflowAction.Status.END_MANUAL);
236        }
237        
238        /**
239         * Return true if the action is USER_RETRY
240         *
241         * @return boolean true if status is USER_RETRY
242         */
243        public boolean isUserRetry() {
244            return (getStatus() == WorkflowAction.Status.USER_RETRY);
245        }
246    
247        /**
248         * Return if the action is complete.
249         *
250         * @return if the action is complete.
251         */
252        public boolean isComplete() {
253            return getStatus() == WorkflowAction.Status.OK || getStatus() == WorkflowAction.Status.KILLED ||
254                    getStatus() == WorkflowAction.Status.ERROR;
255        }
256    
257        /**
258         * Set the action pending flag to true.
259         */
260        public void setPendingOnly() {
261            pending = 1;
262        }
263    
264        /**
265         * Set the action as pending and the current time as pending.
266         */
267        public void setPending() {
268            pending = 1;
269            pendingAge = new Date();
270            pendingAgeTimestamp = DateUtils.convertDateToTimestamp(pendingAge);
271        }
272    
273        /**
274         * Set a time when the action will be pending, normally a time in the future.
275         *
276         * @param pendingAge the time when the action will be pending.
277         */
278        public void setPendingAge(Date pendingAge) {
279            this.pendingAge = pendingAge;
280            this.pendingAgeTimestamp = DateUtils.convertDateToTimestamp(pendingAge);
281        }
282    
283        /**
284         * Return the pending age of the action.
285         *
286         * @return the pending age of the action, <code>null</code> if the action is not pending.
287         */
288        public Date getPendingAge() {
289            return DateUtils.toDate(pendingAgeTimestamp);
290        }
291    
292        /**
293         * Return if the action is pending.
294         *
295         * @return if the action is pending.
296         */
297        public boolean isPending() {
298            return pending == 1 ? true : false;
299        }
300    
301        /**
302         * Removes the pending flag and pendingAge from the action.
303         */
304        public void resetPending() {
305            pending = 0;
306            pendingAge = null;
307            pendingAgeTimestamp = null;
308        }
309    
310        /**
311         * Removes the pending flag from the action.
312         */
313        public void resetPendingOnly() {
314            pending = 0;
315        }
316    
317        /**
318         * Increments the number of retries for the action.
319         */
320        public void incRetries() {
321            setRetries(getRetries() + 1);
322        }
323    
324        /**
325         * Set a tracking information for an action, and set the action status to {@link Action.Status#DONE}
326         *
327         * @param externalId external ID for the action.
328         * @param trackerUri tracker URI for the action.
329         * @param consoleUrl console URL for the action.
330         */
331        public void setStartData(String externalId, String trackerUri, String consoleUrl) {
332            setExternalId(ParamChecker.notEmpty(externalId, "externalId"));
333            setTrackerUri(ParamChecker.notEmpty(trackerUri, "trackerUri"));
334            setConsoleUrl(ParamChecker.notEmpty(consoleUrl, "consoleUrl"));
335            Date now = new Date();
336            setStartTime(now);
337            setLastCheckTime(now);
338            setStatus(Status.RUNNING);
339        }
340    
341        /**
342         * Set the completion information for an action start. Sets the Action status to {@link Action.Status#DONE}
343         *
344         * @param externalStatus action external end status.
345         * @param actionData action output data, <code>null</code> if there is no action output data.
346         */
347        public void setExecutionData(String externalStatus, Properties actionData) {
348            setStatus(Status.DONE);
349            setExternalStatus(ParamChecker.notEmpty(externalStatus, "externalStatus"));
350            if (actionData != null) {
351                setData(PropertiesUtils.propertiesToString(actionData));
352            }
353        }
354    
355        /**
356         * Set the completion information for an action end.
357         *
358         * @param status action status, {@link Action.Status#OK} or {@link Action.Status#ERROR} or {@link
359         * Action.Status#KILLED}
360         * @param signalValue the signal value. In most cases, the value should be OK or ERROR.
361         */
362        public void setEndData(Status status, String signalValue) {
363            if (status == null || (status != Status.OK && status != Status.ERROR && status != Status.KILLED)) {
364                throw new IllegalArgumentException("Action status must be OK, ERROR or KILLED. Received ["
365                        + status.toString() + "]");
366            }
367            if (status == Status.OK) {
368                setErrorInfo(null, null);
369            }
370            setStatus(status);
371            setSignalValue(ParamChecker.notEmpty(signalValue, "signalValue"));
372        }
373    
374    
375        /**
376         * Return the job Id.
377         *
378         * @return the job Id.
379         */
380        public String getJobId() {
381            return wfId;
382        }
383    
384        /**
385         * Return the job Id.
386         *
387         * @return the job Id.
388         */
389        public String getWfId() {
390            return wfId;
391        }
392    
393        /**
394         * Set the job id.
395         *
396         * @param id jobId;
397         */
398        public void setJobId(String id) {
399            this.wfId = id;
400        }
401    
402        public String getSlaXml() {
403            return slaXml;
404        }
405    
406        public void setSlaXml(String slaXml) {
407            this.slaXml = slaXml;
408        }
409    
410        @Override
411        public void setStatus(Status val) {
412            this.status = val.toString();
413            super.setStatus(val);
414        }
415    
416        public String getStatusStr() {
417            return status;
418        }
419    
420        @Override
421        public Status getStatus() {
422            return Status.valueOf(this.status);
423        }
424    
425        /**
426         * Return the node execution path.
427         *
428         * @return the node execution path.
429         */
430        public String getExecutionPath() {
431            return executionPath;
432        }
433    
434        /**
435         * Set the node execution path.
436         *
437         * @param executionPath the node execution path.
438         */
439        public void setExecutionPath(String executionPath) {
440            this.executionPath = executionPath;
441        }
442    
443        /**
444         * Return the signal value for the action. <p/> For decision nodes it is the choosen transition, for actions it is
445         * OK or ERROR.
446         *
447         * @return the action signal value.
448         */
449        public String getSignalValue() {
450            return signalValue;
451        }
452    
453        /**
454         * Set the signal value for the action. <p/> For decision nodes it is the choosen transition, for actions it is OK
455         * or ERROR.
456         *
457         * @param signalValue the action signal value.
458         */
459        public void setSignalValue(String signalValue) {
460            this.signalValue = signalValue;
461        }
462    
463        /**
464         * Return the job log token.
465         *
466         * @return the job log token.
467         */
468        public String getLogToken() {
469            return logToken;
470        }
471    
472        /**
473         * Set the job log token.
474         *
475         * @param logToken the job log token.
476         */
477        public void setLogToken(String logToken) {
478            this.logToken = logToken;
479        }
480    
481        /**
482         * Return the action last check time
483         *
484         * @return the last check time
485         */
486        public Date getLastCheckTime() {
487            return DateUtils.toDate(lastCheckTimestamp);
488        }
489    
490        /**
491         * Return the action last check time
492         *
493         * @return the last check time
494         */
495        public Timestamp getLastCheckTimestamp() {
496            return lastCheckTimestamp;
497        }
498    
499        /**
500         * Return the action last check time
501         *
502         * @return the last check time
503         */
504        public Timestamp getStartTimestamp() {
505            return startTimestamp;
506        }
507    
508        /**
509         * Return the action last check time
510         *
511         * @return the last check time
512         */
513        public Timestamp getEndTimestamp() {
514            return endTimestamp;
515        }
516    
517    
518        /**
519         * Return the action last check time
520         *
521         * @return the last check time
522         */
523        public Timestamp getPendingAgeTimestamp() {
524            return pendingAgeTimestamp;
525        }
526    
527        /**
528         * Sets the action last check time
529         *
530         * @param lastCheckTime the last check time to set.
531         */
532        public void setLastCheckTime(Date lastCheckTime) {
533            this.lastCheckTimestamp = DateUtils.convertDateToTimestamp(lastCheckTime);
534        }
535    
536        public boolean getPending() {
537            return this.pending == 1 ? true : false;
538        }
539    
540        @Override
541        public Date getStartTime() {
542            return DateUtils.toDate(startTimestamp);
543        }
544    
545        @Override
546        public void setStartTime(Date startTime) {
547            super.setStartTime(startTime);
548            this.startTimestamp = DateUtils.convertDateToTimestamp(startTime);
549        }
550    
551        @Override
552        public Date getEndTime() {
553            return DateUtils.toDate(endTimestamp);
554        }
555    
556        @Override
557        public void setEndTime(Date endTime) {
558            super.setEndTime(endTime);
559            this.endTimestamp = DateUtils.convertDateToTimestamp(endTime);
560        }
561    
562    }