001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie;
019    
020    import java.io.DataInput;
021    import java.io.DataOutput;
022    import java.io.IOException;
023    import java.sql.Timestamp;
024    import java.util.Date;
025    
026    import javax.persistence.Basic;
027    import javax.persistence.Column;
028    import javax.persistence.ColumnResult;
029    import javax.persistence.Entity;
030    import javax.persistence.Lob;
031    import javax.persistence.NamedNativeQueries;
032    import javax.persistence.NamedNativeQuery;
033    import javax.persistence.NamedQueries;
034    import javax.persistence.NamedQuery;
035    import javax.persistence.SqlResultSetMapping;
036    
037    import org.apache.hadoop.io.Writable;
038    import org.apache.oozie.client.CoordinatorAction;
039    import org.apache.oozie.client.rest.JsonCoordinatorAction;
040    import org.apache.oozie.util.DateUtils;
041    import org.apache.oozie.util.WritableUtils;
042    import org.apache.openjpa.persistence.jdbc.Index;
043    
044    @SqlResultSetMapping(
045            name = "CoordActionJobIdLmt",
046            columns = {@ColumnResult(name = "job_id"),
047                @ColumnResult(name = "min_lmt")})
048    
049    @Entity
050    @NamedQueries({
051    
052            @NamedQuery(name = "UPDATE_COORD_ACTION", query = "update CoordinatorActionBean w set w.actionNumber = :actionNumber, w.actionXml = :actionXml, w.consoleUrl = :consoleUrl, w.createdConf = :createdConf, w.errorCode = :errorCode, w.errorMessage = :errorMessage, w.externalStatus = :externalStatus, w.missingDependencies = :missingDependencies, w.runConf = :runConf, w.timeOut = :timeOut, w.trackerUri = :trackerUri, w.type = :type, w.createdTimestamp = :createdTime, w.externalId = :externalId, w.jobId = :jobId, w.lastModifiedTimestamp = :lastModifiedTime, w.nominalTimestamp = :nominalTime, w.slaXml = :slaXml, w.status = :status where w.id = :id"),
053    
054            @NamedQuery(name = "UPDATE_COORD_ACTION_MIN", query = "update CoordinatorActionBean w set w.actionXml = :actionXml, w.missingDependencies = :missingDependencies, w.lastModifiedTimestamp = :lastModifiedTime, w.status = :status where w.id = :id"),
055            // Query to update the action status, pending status and last modified time stamp of a Coordinator action
056            @NamedQuery(name = "UPDATE_COORD_ACTION_STATUS_PENDING_TIME", query = "update CoordinatorActionBean w set w.status =:status, w.pending =:pending, w.lastModifiedTimestamp = :lastModifiedTime where w.id = :id"),
057            // Update query for InputCheck
058            @NamedQuery(name = "UPDATE_COORD_ACTION_FOR_INPUTCHECK", query = "update CoordinatorActionBean w set w.status = :status, w.lastModifiedTimestamp = :lastModifiedTime, w.actionXml = :actionXml, w.missingDependencies = :missingDependencies where w.id = :id"),
059            // Update query for Push-based missing dependency check
060            @NamedQuery(name = "UPDATE_COORD_ACTION_FOR_PUSH_INPUTCHECK", query = "update CoordinatorActionBean w set w.status = :status, w.lastModifiedTimestamp = :lastModifiedTime,  w.actionXml = :actionXml, w.pushMissingDependencies = :pushMissingDependencies where w.id = :id"),
061            // Update query for Start
062            @NamedQuery(name = "UPDATE_COORD_ACTION_FOR_START", query = "update CoordinatorActionBean w set w.status =:status, w.lastModifiedTimestamp = :lastModifiedTime, w.runConf = :runConf, w.externalId = :externalId, w.pending = :pending, w.errorCode = :errorCode, w.errorMessage = :errorMessage  where w.id = :id"),
063    
064            @NamedQuery(name = "UPDATE_COORD_ACTION_FOR_MODIFIED_DATE", query = "update CoordinatorActionBean w set w.lastModifiedTimestamp = :lastModifiedTime where w.id = :id"),
065    
066            @NamedQuery(name = "DELETE_COMPLETED_ACTIONS_FOR_COORDINATOR", query = "delete from CoordinatorActionBean a where a.jobId = :jobId and (a.status = 'SUCCEEDED' OR a.status = 'FAILED' OR a.status= 'KILLED')"),
067    
068            @NamedQuery(name = "DELETE_ACTIONS_FOR_COORDINATOR", query = "delete from CoordinatorActionBean a where a.jobId = :jobId"),
069    
070            @NamedQuery(name = "DELETE_UNSCHEDULED_ACTION", query = "delete from CoordinatorActionBean a where a.id = :id and (a.status = 'WAITING' OR a.status = 'READY')"),
071    
072            // Query used by XTestcase to setup tables
073            @NamedQuery(name = "GET_COORD_ACTIONS", query = "select OBJECT(w) from CoordinatorActionBean w"),
074            // Select query used only by test cases
075            @NamedQuery(name = "GET_COORD_ACTION", query = "select OBJECT(a) from CoordinatorActionBean a where a.id = :id"),
076    
077            // Select query used by SLAService on restart
078            @NamedQuery(name = "GET_COORD_ACTION_FOR_SLA", query = "select a.id, a.jobId, a.status, a.externalId, a.lastModifiedTimestamp from CoordinatorActionBean a where a.id = :id"),
079            // Select query used by ActionInfo command
080            @NamedQuery(name = "GET_COORD_ACTION_FOR_INFO", query = "select a.id, a.jobId, a.actionNumber, a.consoleUrl, a.errorCode, a.errorMessage, a.externalId, a.externalStatus, a.trackerUri, a.createdTimestamp, a.nominalTimestamp, a.status, a.lastModifiedTimestamp, a.missingDependencies, a.pushMissingDependencies from CoordinatorActionBean a where a.id = :id"),
081            // Select Query used by Timeout command
082            @NamedQuery(name = "GET_COORD_ACTION_FOR_TIMEOUT", query = "select a.id, a.jobId, a.status, a.runConf, a.pending, a.nominalTimestamp, a.createdTimestamp from CoordinatorActionBean a where a.id = :id"),
083            // Select query used by InputCheck command
084            @NamedQuery(name = "GET_COORD_ACTION_FOR_INPUTCHECK", query = "select a.id, a.jobId, a.status, a.runConf, a.nominalTimestamp, a.createdTimestamp, a.actionXml, a.missingDependencies, a.pushMissingDependencies, a.timeOut from CoordinatorActionBean a where a.id = :id"),
085            // Select query used by CoordActionUpdate command
086            @NamedQuery(name = "GET_COORD_ACTION_FOR_EXTERNALID", query = "select a.id, a.jobId, a.status, a.pending, a.externalId, a.lastModifiedTimestamp, a.slaXml, a.nominalTimestamp, a.createdTimestamp from CoordinatorActionBean a where a.externalId = :externalId"),
087            // Select query used by Check command
088            @NamedQuery(name = "GET_COORD_ACTION_FOR_CHECK", query = "select a.id, a.jobId, a.status, a.pending, a.externalId, a.lastModifiedTimestamp, a.slaXml, a.nominalTimestamp, a.createdTimestamp from CoordinatorActionBean a where a.id = :id"),
089            // Select query used by Start command
090            @NamedQuery(name = "GET_COORD_ACTION_FOR_START", query = "select a.id, a.jobId, a.status, a.pending, a.createdConf, a.slaXml, a.actionXml, a.externalId, a.errorMessage, a.errorCode, a.nominalTimestamp, a.createdTimestamp from CoordinatorActionBean a where a.id = :id"),
091    
092            @NamedQuery(name = "GET_COORD_ACTIONS_FOR_JOB_FIFO", query = "select a.id, a.jobId, a.status, a.pending, a.nominalTimestamp, a.createdTimestamp from CoordinatorActionBean a where a.jobId = :jobId AND a.status = 'READY' order by a.nominalTimestamp"),
093    
094            @NamedQuery(name = "GET_COORD_ACTIONS_FOR_JOB_LIFO", query = "select a.id, a.jobId, a.status, a.pending, a.nominalTimestamp, a.createdTimestamp from CoordinatorActionBean a where a.jobId = :jobId AND a.status = 'READY' order by a.nominalTimestamp desc"),
095    
096            @NamedQuery(name = "GET_COORD_RUNNING_ACTIONS_COUNT", query = "select count(a) from CoordinatorActionBean a where a.jobId = :jobId AND (a.status = 'RUNNING' OR a.status='SUBMITTED')"),
097    
098            @NamedQuery(name = "GET_COORD_ACTIONS_COUNT_BY_JOBID", query = "select count(a) from CoordinatorActionBean a where a.jobId = :jobId"),
099    
100            @NamedQuery(name = "GET_COORD_ACTIVE_ACTIONS_COUNT_BY_JOBID", query = "select count(a) from CoordinatorActionBean a where a.jobId = :jobId AND a.status = 'WAITING'"),
101    
102            @NamedQuery(name = "GET_COORD_ACTIONS_PENDING_FALSE_COUNT", query = "select count(a) from CoordinatorActionBean a where a.jobId = :jobId AND a.pending = 0 AND (a.status = 'SUSPENDED' OR a.status = 'TIMEDOUT' OR a.status = 'SUCCEEDED' OR a.status = 'KILLED' OR a.status = 'FAILED')"),
103    
104            @NamedQuery(name = "GET_COORD_ACTIONS_PENDING_FALSE_STATUS_COUNT", query = "select count(a) from CoordinatorActionBean a where a.jobId = :jobId AND a.pending = 0 AND a.status = :status"),
105    
106            @NamedQuery(name = "GET_ACTIONS_FOR_COORD_JOB", query = "select count(a) from CoordinatorActionBean a where a.jobId = :jobId"),
107            // Query to retrieve Coordinator actions sorted by nominal time
108            @NamedQuery(name = "GET_ACTIONS_FOR_COORD_JOB_ORDER_BY_NOMINAL_TIME", query = "select a.id, a.actionNumber, a.consoleUrl, a.errorCode, a.errorMessage, a.externalId, a.externalStatus, a.jobId, a.trackerUri, a.createdTimestamp, a.nominalTimestamp, a.status, a.lastModifiedTimestamp, a.missingDependencies, a.pushMissingDependencies, a.timeOut from CoordinatorActionBean a where a.jobId = :jobId order by a.nominalTimestamp"),
109            // Query to maintain backward compatibility for coord job info command
110            @NamedQuery(name = "GET_ALL_COLS_FOR_ACTIONS_FOR_COORD_JOB_ORDER_BY_NOMINAL_TIME", query = "select OBJECT(a) from CoordinatorActionBean a where a.jobId = :jobId order by a.nominalTimestamp"),
111            // Query to retrieve action id, action status, pending status and external Id of not completed Coordinator actions
112            @NamedQuery(name = "GET_COORD_ACTIONS_NOT_COMPLETED", query = "select a.id, a.status, a.pending, a.externalId, a.pushMissingDependencies, a.nominalTimestamp, a.createdTimestamp, a.jobId from CoordinatorActionBean a where a.jobId = :jobId AND a.status <> 'FAILED' AND a.status <> 'TIMEDOUT' AND a.status <> 'SUCCEEDED' AND a.status <> 'KILLED'"),
113    
114            // Query to retrieve action id, action status, pending status and external Id of running Coordinator actions
115            @NamedQuery(name = "GET_COORD_ACTIONS_RUNNING", query = "select a.id, a.status, a.pending, a.externalId, a.nominalTimestamp, a.createdTimestamp from CoordinatorActionBean a where a.jobId = :jobId and a.status = 'RUNNING'"),
116    
117            // Query to retrieve action id, action status, pending status and external Id of suspended Coordinator actions
118            @NamedQuery(name = "GET_COORD_ACTIONS_SUSPENDED", query = "select a.id, a.status, a.pending, a.externalId, a.nominalTimestamp, a.createdTimestamp from CoordinatorActionBean a where a.jobId = :jobId and a.status = 'SUSPENDED'"),
119    
120            // Query to retrieve count of Coordinator actions which are pending
121            @NamedQuery(name = "GET_COORD_ACTIONS_PENDING_COUNT", query = "select count(a) from CoordinatorActionBean a where a.jobId = :jobId AND a.pending > 0"),
122    
123            // Query to retrieve status of Coordinator actions
124            @NamedQuery(name = "GET_COORD_ACTIONS_STATUS", query = "select a.status from CoordinatorActionBean a where a.jobId = :jobId"),
125    
126            @NamedQuery(name = "GET_COORD_ACTION_FOR_COORD_JOB_BY_ACTION_NUMBER", query = "select a.id from CoordinatorActionBean a where a.jobId = :jobId AND a.actionNumber = :actionNumber"),
127    
128            @NamedQuery(name = "GET_COORD_ACTIONS_BY_LAST_MODIFIED_TIME", query = "select a.jobId from CoordinatorActionBean a where a.lastModifiedTimestamp >= :lastModifiedTime"),
129    
130            //Used by coordinator store only
131            @NamedQuery(name = "GET_RUNNING_ACTIONS_FOR_COORD_JOB", query = "select OBJECT(a) from CoordinatorActionBean a where a.jobId = :jobId AND a.status = 'RUNNING'"),
132    
133            @NamedQuery(name = "GET_RUNNING_ACTIONS_OLDER_THAN", query = "select a.id from CoordinatorActionBean a where a.status = 'RUNNING' AND a.lastModifiedTimestamp <= :lastModifiedTime"),
134    
135            @NamedQuery(name = "GET_COORD_ACTIONS_WAITING_SUBMITTED_OLDER_THAN", query = "select a.id, a.jobId, a.status, a.externalId, a.pushMissingDependencies from CoordinatorActionBean a where (a.status = 'WAITING' OR a.status = 'SUBMITTED') AND a.lastModifiedTimestamp <= :lastModifiedTime"),
136    
137            @NamedQuery(name = "GET_COORD_ACTIONS_FOR_RECOVERY_OLDER_THAN", query = "select a.id, a.jobId, a.status, a.externalId from CoordinatorActionBean a where a.pending > 0 AND (a.status = 'SUSPENDED' OR a.status = 'KILLED' OR a.status = 'RUNNING') AND a.lastModifiedTimestamp <= :lastModifiedTime"),
138            // Select query used by rerun, requires almost all columns so select * is used
139            @NamedQuery(name = "GET_ACTIONS_FOR_DATES", query = "select OBJECT(a) from CoordinatorActionBean a where a.jobId = :jobId AND (a.status = 'TIMEDOUT' OR a.status = 'SUCCEEDED' OR a.status = 'KILLED' OR a.status = 'FAILED') AND a.nominalTimestamp >= :startTime AND a.nominalTimestamp <= :endTime"),
140            // Select query used by log
141            @NamedQuery(name = "GET_ACTION_IDS_FOR_DATES", query = "select a.id from CoordinatorActionBean a where a.jobId = :jobId AND (a.status = 'TIMEDOUT' OR a.status = 'SUCCEEDED' OR a.status = 'KILLED' OR a.status = 'FAILED') AND a.nominalTimestamp >= :startTime AND a.nominalTimestamp <= :endTime"),
142            // Select query used by rerun, requires almost all columns so select * is used
143            @NamedQuery(name = "GET_ACTION_FOR_NOMINALTIME", query = "select OBJECT(a) from CoordinatorActionBean a where a.jobId = :jobId AND a.nominalTimestamp = :nominalTime"),
144    
145            @NamedQuery(name = "GET_COORD_ACTIONS_COUNT", query = "select count(w) from CoordinatorActionBean w")})
146    
147    @NamedNativeQueries({
148    
149        @NamedNativeQuery(name = "GET_READY_ACTIONS_GROUP_BY_JOBID", query = "select a.job_id as job_id, MIN(a.last_modified_time) as min_lmt from COORD_ACTIONS a where a.status = 'READY' GROUP BY a.job_id HAVING MIN(a.last_modified_time) < ?", resultSetMapping = "CoordActionJobIdLmt")
150            })
151    public class CoordinatorActionBean extends JsonCoordinatorAction implements
152            Writable {
153        @Basic
154        @Index
155        @Column(name = "job_id")
156        private String jobId;
157    
158        @Basic
159        @Index
160        @Column(name = "status")
161        private String status = null;
162    
163        @Basic
164        @Index
165        @Column(name = "nominal_time")
166        private java.sql.Timestamp nominalTimestamp = null;
167    
168        @Basic
169        @Index
170        @Column(name = "last_modified_time")
171        private java.sql.Timestamp lastModifiedTimestamp = null;
172    
173        @Basic
174        @Index
175        @Column(name = "created_time")
176        private java.sql.Timestamp createdTimestamp = null;
177    
178        @Basic
179        @Index
180        @Column(name = "rerun_time")
181        private java.sql.Timestamp rerunTimestamp = null;
182    
183        @Basic
184        @Index
185        @Column(name = "external_id")
186        private String externalId;
187    
188        @Column(name = "sla_xml")
189        @Lob
190        private String slaXml = null;
191    
192        @Basic
193        @Column(name = "pending")
194        private int pending = 0;
195    
196        public CoordinatorActionBean() {
197        }
198    
199        /**
200         * Serialize the coordinator bean to a data output.
201         *
202         * @param dataOutput data output.
203         * @throws IOException thrown if the coordinator bean could not be
204         *         serialized.
205         */
206        @Override
207        public void write(DataOutput dataOutput) throws IOException {
208            WritableUtils.writeStr(dataOutput, getJobId());
209            WritableUtils.writeStr(dataOutput, getType());
210            WritableUtils.writeStr(dataOutput, getId());
211            WritableUtils.writeStr(dataOutput, getCreatedConf());
212            WritableUtils.writeStr(dataOutput, getStatus().toString());
213            dataOutput.writeInt(getActionNumber());
214            WritableUtils.writeStr(dataOutput, getRunConf());
215            WritableUtils.writeStr(dataOutput, getExternalStatus());
216            WritableUtils.writeStr(dataOutput, getTrackerUri());
217            WritableUtils.writeStr(dataOutput, getConsoleUrl());
218            WritableUtils.writeStr(dataOutput, getErrorCode());
219            WritableUtils.writeStr(dataOutput, getErrorMessage());
220            dataOutput.writeLong((getCreatedTime() != null) ? getCreatedTime().getTime() : -1);
221            dataOutput.writeLong((getLastModifiedTime() != null) ? getLastModifiedTime().getTime() : -1);
222        }
223    
224        /**
225         * Deserialize a coordinator bean from a data input.
226         *
227         * @param dataInput data input.
228         * @throws IOException thrown if the workflow bean could not be
229         *         deserialized.
230         */
231        @Override
232        public void readFields(DataInput dataInput) throws IOException {
233            setJobId(WritableUtils.readStr(dataInput));
234            setType(WritableUtils.readStr(dataInput));
235            setId(WritableUtils.readStr(dataInput));
236            setCreatedConf(WritableUtils.readStr(dataInput));
237            setStatus(CoordinatorAction.Status.valueOf(WritableUtils.readStr(dataInput)));
238            setActionNumber(dataInput.readInt());
239            setRunConf(WritableUtils.readStr(dataInput));
240            setExternalStatus(WritableUtils.readStr(dataInput));
241            setTrackerUri(WritableUtils.readStr(dataInput));
242            setConsoleUrl(WritableUtils.readStr(dataInput));
243            setErrorCode(WritableUtils.readStr(dataInput));
244            setErrorMessage(WritableUtils.readStr(dataInput));
245            long d = dataInput.readLong();
246            if (d != -1) {
247                setCreatedTime(new Date(d));
248            }
249            d = dataInput.readLong();
250            if (d != -1) {
251                setLastModifiedTime(new Date(d));
252            }
253        }
254    
255        @Override
256        public String getJobId() {
257            return this.jobId;
258        }
259    
260        @Override
261        public void setJobId(String id) {
262            super.setJobId(id);
263            this.jobId = id;
264        }
265    
266        @Override
267        public Status getStatus() {
268            return Status.valueOf(status);
269        }
270    
271        /**
272         * Return the status in string
273         * @return
274         */
275        public String getStatusStr() {
276            return status;
277        }
278    
279        @Override
280        public void setStatus(Status status) {
281            super.setStatus(status);
282            this.status = status.toString();
283        }
284    
285        @Override
286        public void setCreatedTime(Date createdTime) {
287            this.createdTimestamp = DateUtils.convertDateToTimestamp(createdTime);
288            super.setCreatedTime(createdTime);
289        }
290    
291        public void setRerunTime(Date rerunTime) {
292            this.rerunTimestamp = DateUtils.convertDateToTimestamp(rerunTime);
293        }
294    
295        @Override
296        public void setNominalTime(Date nominalTime) {
297            this.nominalTimestamp = DateUtils.convertDateToTimestamp(nominalTime);
298            super.setNominalTime(nominalTime);
299        }
300    
301        @Override
302        public void setLastModifiedTime(Date lastModifiedTime) {
303            this.lastModifiedTimestamp = DateUtils.convertDateToTimestamp(lastModifiedTime);
304            super.setLastModifiedTime(lastModifiedTime);
305        }
306    
307        @Override
308        public Date getCreatedTime() {
309            return DateUtils.toDate(createdTimestamp);
310        }
311    
312        public Timestamp getCreatedTimestamp() {
313            return createdTimestamp;
314        }
315    
316        public Date getRerunTime() {
317            return DateUtils.toDate(rerunTimestamp);
318        }
319    
320        public Timestamp getRerunTimestamp() {
321            return rerunTimestamp;
322        }
323    
324        @Override
325        public Date getLastModifiedTime() {
326            return DateUtils.toDate(lastModifiedTimestamp);
327        }
328    
329        public Timestamp getLastModifiedTimestamp() {
330            return lastModifiedTimestamp;
331        }
332    
333        @Override
334        public Date getNominalTime() {
335            return DateUtils.toDate(nominalTimestamp);
336        }
337    
338        public Timestamp getNominalTimestamp() {
339            return nominalTimestamp;
340        }
341    
342        @Override
343        public String getExternalId() {
344            return externalId;
345        }
346    
347        @Override
348        public void setExternalId(String externalId) {
349            super.setExternalId(externalId);
350            this.externalId = externalId;
351        }
352    
353        public String getSlaXml() {
354            return slaXml;
355        }
356    
357        public void setSlaXml(String slaXml) {
358            this.slaXml = slaXml;
359        }
360    
361        /**
362         * @return true if in terminal status
363         */
364        public boolean isTerminalStatus() {
365            boolean isTerminal = true;
366            switch (getStatus()) {
367                case WAITING:
368                case READY:
369                case SUBMITTED:
370                case RUNNING:
371                case SUSPENDED:
372                    isTerminal = false;
373                    break;
374                default:
375                    isTerminal = true;
376                    break;
377            }
378            return isTerminal;
379        }
380    
381        /**
382         * Return if the action is complete with failure.
383         *
384         * @return if the action is complete with failure.
385         */
386        public boolean isTerminalWithFailure() {
387            boolean result = false;
388            switch (getStatus()) {
389                case FAILED:
390                case KILLED:
391                case TIMEDOUT:
392                    result = true;
393            }
394            return result;
395        }
396    
397        /**
398         * Set some actions are in progress for particular coordinator action.
399         *
400         * @param pending set pending to true
401         */
402        public void setPending(int pending) {
403            this.pending = pending;
404        }
405    
406        /**
407         * increment pending and return it
408         *
409         * @return pending
410         */
411        public int incrementAndGetPending() {
412            this.pending++;
413            return pending;
414        }
415    
416        /**
417         * decrement pending and return it
418         *
419         * @return pending
420         */
421        public int decrementAndGetPending() {
422            this.pending = Math.max(this.pending - 1, 0);
423            return pending;
424        }
425    
426        /**
427         * Get some actions are in progress for particular bundle action.
428         *
429         * @return pending
430         */
431        public int getPending() {
432            return this.pending;
433        }
434    
435        /**
436         * Return if the action is pending.
437         *
438         * @return if the action is pending.
439         */
440        public boolean isPending() {
441            return pending > 0 ? true : false;
442        }
443    }