001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     * 
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     * 
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie;
019    
020    import java.io.DataInput;
021    import java.io.DataOutput;
022    import java.io.IOException;
023    import java.sql.Timestamp;
024    import java.util.Date;
025    
026    import javax.persistence.Basic;
027    import javax.persistence.Column;
028    import javax.persistence.ColumnResult;
029    import javax.persistence.Entity;
030    import javax.persistence.Lob;
031    import javax.persistence.NamedNativeQueries;
032    import javax.persistence.NamedNativeQuery;
033    import javax.persistence.NamedQueries;
034    import javax.persistence.NamedQuery;
035    import javax.persistence.SqlResultSetMapping;
036    
037    import org.apache.hadoop.io.Writable;
038    import org.apache.oozie.client.CoordinatorAction;
039    import org.apache.oozie.client.rest.JsonCoordinatorAction;
040    import org.apache.oozie.util.DateUtils;
041    import org.apache.oozie.util.WritableUtils;
042    import org.apache.openjpa.persistence.jdbc.Index;
043    
044    @SqlResultSetMapping(
045            name = "CoordActionJobIdLmt",
046            columns = {@ColumnResult(name = "job_id"),
047                @ColumnResult(name = "min_lmt")})
048    
049    @Entity
050    @NamedQueries({
051    
052        @NamedQuery(name = "UPDATE_COORD_ACTION", query = "update CoordinatorActionBean w set w.actionNumber = :actionNumber, w.actionXml = :actionXml, w.consoleUrl = :consoleUrl, w.createdConf = :createdConf, w.errorCode = :errorCode, w.errorMessage = :errorMessage, w.externalStatus = :externalStatus, w.missingDependencies = :missingDependencies, w.runConf = :runConf, w.timeOut = :timeOut, w.trackerUri = :trackerUri, w.type = :type, w.createdTimestamp = :createdTime, w.externalId = :externalId, w.jobId = :jobId, w.lastModifiedTimestamp = :lastModifiedTime, w.nominalTimestamp = :nominalTime, w.slaXml = :slaXml, w.status = :status where w.id = :id"),
053    
054        @NamedQuery(name = "UPDATE_COORD_ACTION_MIN", query = "update CoordinatorActionBean w set w.actionXml = :actionXml, w.missingDependencies = :missingDependencies, w.lastModifiedTimestamp = :lastModifiedTime, w.status = :status where w.id = :id"),
055    
056        @NamedQuery(name = "DELETE_COMPLETED_ACTIONS_FOR_COORDINATOR", query = "delete from CoordinatorActionBean a where a.jobId = :jobId and (a.status = 'SUCCEEDED' OR a.status = 'FAILED' OR a.status= 'KILLED')"),
057    
058        @NamedQuery(name = "GET_COORD_ACTIONS", query = "select OBJECT(w) from CoordinatorActionBean w"),
059    
060        @NamedQuery(name = "GET_COMPLETED_ACTIONS_OLDER_THAN", query = "select OBJECT(a) from CoordinatorActionBean a where a.createdTimestamp < :createdTime and (a.status = 'SUCCEEDED' OR a.status = 'FAILED' OR a.status = 'KILLED')"),
061    
062        @NamedQuery(name = "GET_COORD_ACTION", query = "select OBJECT(a) from CoordinatorActionBean a where a.id = :id"),
063    
064        @NamedQuery(name = "GET_COORD_ACTION_FOR_EXTERNALID", query = "select OBJECT(a) from CoordinatorActionBean a where a.externalId = :externalId"),
065    
066        @NamedQuery(name = "GET_COORD_ACTIONS_FOR_JOB_FIFO", query = "select OBJECT(a) from CoordinatorActionBean a where a.jobId = :jobId AND a.status = 'READY' order by a.nominalTimestamp"),
067    
068        @NamedQuery(name = "GET_COORD_ACTIONS_FOR_JOB_LIFO", query = "select OBJECT(a) from CoordinatorActionBean a where a.jobId = :jobId AND a.status = 'READY' order by a.nominalTimestamp desc"),
069    
070        @NamedQuery(name = "GET_COORD_RUNNING_ACTIONS_COUNT", query = "select count(a) from CoordinatorActionBean a where a.jobId = :jobId AND (a.status = 'RUNNING' OR a.status='SUBMITTED')"),
071    
072        @NamedQuery(name = "GET_COORD_ACTIONS_COUNT_BY_JOBID", query = "select count(a) from CoordinatorActionBean a where a.jobId = :jobId"),
073    
074        @NamedQuery(name = "GET_COORD_ACTIVE_ACTIONS_COUNT_BY_JOBID", query = "select count(a) from CoordinatorActionBean a where a.jobId = :jobId AND a.status = 'WAITING'"),
075    
076        @NamedQuery(name = "GET_COORD_ACTIONS_PENDING_FALSE_COUNT", query = "select count(a) from CoordinatorActionBean a where a.jobId = :jobId AND a.pending = 0 AND (a.status = 'SUSPENDED' OR a.status = 'TIMEDOUT' OR a.status = 'SUCCEEDED' OR a.status = 'KILLED' OR a.status = 'FAILED')"),
077    
078        @NamedQuery(name = "GET_COORD_ACTIONS_PENDING_FALSE_STATUS_COUNT", query = "select count(a) from CoordinatorActionBean a where a.jobId = :jobId AND a.pending = 0 AND a.status = :status"),
079    
080        @NamedQuery(name = "GET_ACTIONS_FOR_COORD_JOB", query = "select OBJECT(a) from CoordinatorActionBean a where a.jobId = :jobId"),
081    
082        @NamedQuery(name = "GET_COORD_ACTION_FOR_COORD_JOB_BY_ACTION_NUMBER", query = "select OBJECT(a) from CoordinatorActionBean a where a.jobId = :jobId AND a.actionNumber = :actionNumber"),
083    
084        @NamedQuery(name = "GET_COORD_ACTIONS_BY_LAST_MODIFIED_TIME", query = "select OBJECT(w) from CoordinatorActionBean w where w.lastModifiedTimestamp >= :lastModifiedTime"),
085    
086        @NamedQuery(name = "GET_RUNNING_ACTIONS_FOR_COORD_JOB", query = "select OBJECT(a) from CoordinatorActionBean a where a.jobId = :jobId AND a.status = 'RUNNING'"),
087    
088        @NamedQuery(name = "GET_RUNNING_ACTIONS_OLDER_THAN", query = "select OBJECT(a) from CoordinatorActionBean a where a.status = 'RUNNING' AND a.lastModifiedTimestamp <= :lastModifiedTime"),
089    
090        @NamedQuery(name = "GET_COORD_ACTIONS_WAITING_SUBMITTED_OLDER_THAN", query = "select OBJECT(a) from CoordinatorActionBean a where (a.status = 'WAITING' OR a.status = 'SUBMITTED') AND a.lastModifiedTimestamp <= :lastModifiedTime"),
091    
092        @NamedQuery(name = "GET_COORD_ACTIONS_FOR_RECOVERY_OLDER_THAN", query = "select OBJECT(a) from CoordinatorActionBean a where a.pending > 0 AND (a.status = 'SUSPENDED' OR a.status = 'KILLED' OR a.status = 'RUNNING') AND a.lastModifiedTimestamp <= :lastModifiedTime"),
093    
094        @NamedQuery(name = "GET_ACTIONS_FOR_DATES", query = "select OBJECT(a) from CoordinatorActionBean a where a.jobId = :jobId AND (a.status = 'TIMEDOUT' OR a.status = 'SUCCEEDED' OR a.status = 'KILLED' OR a.status = 'FAILED') AND a.nominalTimestamp >= :startTime AND a.nominalTimestamp <= :endTime"),
095    
096        @NamedQuery(name = "GET_ACTION_FOR_NOMINALTIME", query = "select OBJECT(a) from CoordinatorActionBean a where a.jobId = :jobId AND a.nominalTimestamp = :nominalTime"),
097    
098        @NamedQuery(name = "GET_COORD_ACTIONS_COUNT", query = "select count(w) from CoordinatorActionBean w")})
099    
100    @NamedNativeQueries({
101    
102        @NamedNativeQuery(name = "GET_READY_ACTIONS_GROUP_BY_JOBID", query = "select a.job_id as job_id, MIN(a.last_modified_time) as min_lmt from COORD_ACTIONS a where a.status = 'READY' GROUP BY a.job_id HAVING MIN(a.last_modified_time) < ?", resultSetMapping = "CoordActionJobIdLmt")
103            })
104    public class CoordinatorActionBean extends JsonCoordinatorAction implements
105            Writable {
106        @Basic
107        @Index
108        @Column(name = "job_id")
109        private String jobId;
110    
111        @Basic
112        @Index
113        @Column(name = "status")
114        private String status = null;
115    
116        @Basic
117        @Column(name = "nominal_time")
118        private java.sql.Timestamp nominalTimestamp = null;
119    
120        @Basic
121        @Index
122        @Column(name = "last_modified_time")
123        private java.sql.Timestamp lastModifiedTimestamp = null;
124    
125        @Basic
126        @Index
127        @Column(name = "created_time")
128        private java.sql.Timestamp createdTimestamp = null;
129    
130        @Basic
131        @Index
132        @Column(name = "rerun_time")
133        private java.sql.Timestamp rerunTimestamp = null;
134    
135        @Basic
136        @Index
137        @Column(name = "external_id")
138        private String externalId;
139    
140        @Column(name = "sla_xml")
141        @Lob
142        private String slaXml = null;
143    
144        @Basic
145        @Column(name = "pending")
146        private int pending = 0;
147    
148        public CoordinatorActionBean() {
149        }
150    
151        /**
152         * Serialize the coordinator bean to a data output.
153         *
154         * @param dataOutput data output.
155         * @throws IOException thrown if the coordinator bean could not be serialized.
156         */
157        public void write(DataOutput dataOutput) throws IOException {
158            WritableUtils.writeStr(dataOutput, getJobId());
159            WritableUtils.writeStr(dataOutput, getType());
160            WritableUtils.writeStr(dataOutput, getId());
161            WritableUtils.writeStr(dataOutput, getCreatedConf());
162            WritableUtils.writeStr(dataOutput, getStatus().toString());
163            dataOutput.writeInt(getActionNumber());
164            WritableUtils.writeStr(dataOutput, getRunConf());
165            WritableUtils.writeStr(dataOutput, getExternalStatus());
166            WritableUtils.writeStr(dataOutput, getTrackerUri());
167            WritableUtils.writeStr(dataOutput, getConsoleUrl());
168            WritableUtils.writeStr(dataOutput, getErrorCode());
169            WritableUtils.writeStr(dataOutput, getErrorMessage());
170            dataOutput.writeLong((getCreatedTime() != null) ? getCreatedTime().getTime() : -1);
171            dataOutput.writeLong((getLastModifiedTime() != null) ? getLastModifiedTime().getTime() : -1);
172        }
173    
174        /**
175         * Deserialize a coordinator bean from a data input.
176         *
177         * @param dataInput data input.
178         * @throws IOException thrown if the workflow bean could not be deserialized.
179         */
180        public void readFields(DataInput dataInput) throws IOException {
181            setJobId(WritableUtils.readStr(dataInput));
182            setType(WritableUtils.readStr(dataInput));
183            setId(WritableUtils.readStr(dataInput));
184            setCreatedConf(WritableUtils.readStr(dataInput));
185            setStatus(CoordinatorAction.Status.valueOf(WritableUtils.readStr(dataInput)));
186            setActionNumber(dataInput.readInt());
187            setRunConf(WritableUtils.readStr(dataInput));
188            setExternalStatus(WritableUtils.readStr(dataInput));
189            setTrackerUri(WritableUtils.readStr(dataInput));
190            setConsoleUrl(WritableUtils.readStr(dataInput));
191            setErrorCode(WritableUtils.readStr(dataInput));
192            setErrorMessage(WritableUtils.readStr(dataInput));
193            long d = dataInput.readLong();
194            if (d != -1) {
195                setCreatedTime(new Date(d));
196            }
197            d = dataInput.readLong();
198            if (d != -1) {
199                setLastModifiedTime(new Date(d));
200            }
201        }
202    
203        @Override
204        public String getJobId() {
205            return this.jobId;
206        }
207    
208        @Override
209        public void setJobId(String id) {
210            super.setJobId(id);
211            this.jobId = id;
212        }
213    
214        @Override
215        public Status getStatus() {
216            return Status.valueOf(status);
217        }
218    
219        @Override
220        public void setStatus(Status status) {
221            super.setStatus(status);
222            this.status = status.toString();
223        }
224    
225        @Override
226        public void setCreatedTime(Date createdTime) {
227            this.createdTimestamp = DateUtils.convertDateToTimestamp(createdTime);
228            super.setCreatedTime(createdTime);
229        }
230    
231        public void setRerunTime(Date rerunTime) {
232            this.rerunTimestamp = DateUtils.convertDateToTimestamp(rerunTime);
233        }
234    
235        @Override
236        public void setNominalTime(Date nominalTime) {
237            this.nominalTimestamp = DateUtils.convertDateToTimestamp(nominalTime);
238            super.setNominalTime(nominalTime);
239        }
240    
241        @Override
242        public void setLastModifiedTime(Date lastModifiedTime) {
243            this.lastModifiedTimestamp = DateUtils.convertDateToTimestamp(lastModifiedTime);
244            super.setLastModifiedTime(lastModifiedTime);
245        }
246    
247        @Override
248        public Date getCreatedTime() {
249            return DateUtils.toDate(createdTimestamp);
250        }
251    
252        public Timestamp getCreatedTimestamp() {
253            return createdTimestamp;
254        }
255    
256        public Date getRerunTime() {
257            return DateUtils.toDate(rerunTimestamp);
258        }
259    
260        public Timestamp getRerunTimestamp() {
261            return rerunTimestamp;
262        }
263    
264        @Override
265        public Date getLastModifiedTime() {
266            return DateUtils.toDate(lastModifiedTimestamp);
267        }
268    
269        public Timestamp getLastModifiedTimestamp() {
270            return lastModifiedTimestamp;
271        }
272    
273        @Override
274        public Date getNominalTime() {
275            return DateUtils.toDate(nominalTimestamp);
276        }
277    
278        public Timestamp getNominalTimestamp() {
279            return nominalTimestamp;
280        }
281    
282        @Override
283        public String getExternalId() {
284            return externalId;
285        }
286    
287        @Override
288        public void setExternalId(String externalId) {
289            super.setExternalId(externalId);
290            this.externalId = externalId;
291        }
292    
293        public String getSlaXml() {
294            return slaXml;
295        }
296    
297        public void setSlaXml(String slaXml) {
298            this.slaXml = slaXml;
299        }
300    
301        /**
302         * @return true if in terminal status
303         */
304        public boolean isTerminalStatus() {
305            boolean isTerminal = true;
306            switch (getStatus()) {
307                case WAITING:
308                case READY:
309                case SUBMITTED:
310                case RUNNING:
311                case SUSPENDED:
312                    isTerminal = false;
313                    break;
314                default:
315                    isTerminal = true;
316                    break;
317            }
318            return isTerminal;
319        }
320    
321        /**
322         * Set some actions are in progress for particular coordinator action.
323         *
324         * @param pending set pending to true
325         */
326        public void setPending(int pending) {
327            this.pending = pending;
328        }
329    
330        /**
331         * increment pending and return it
332         *
333         * @return pending
334         */
335        public int incrementAndGetPending() {
336            this.pending++;
337            return pending;
338        }
339    
340        /**
341         * decrement pending and return it
342         *
343         * @return pending
344         */
345        public int decrementAndGetPending() {
346            this.pending = Math.max(this.pending-1, 0);
347            return pending;
348        }
349    
350        /**
351         * Get some actions are in progress for particular bundle action.
352         *
353         * @return pending
354         */
355        public int getPending() {
356            return this.pending;
357        }
358    
359        /**
360         * Return if the action is pending.
361         *
362         * @return if the action is pending.
363         */
364        public boolean isPending() {
365            return pending > 0 ? true : false;
366        }
367    }