001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie;
020
021import java.io.DataInput;
022import java.io.DataOutput;
023import java.io.IOException;
024import java.sql.Timestamp;
025import java.util.Date;
026
027import javax.persistence.Basic;
028import javax.persistence.Column;
029import javax.persistence.Entity;
030import javax.persistence.Id;
031import javax.persistence.NamedQueries;
032import javax.persistence.NamedQuery;
033import javax.persistence.Table;
034
035import org.apache.hadoop.io.Writable;
036import org.apache.oozie.client.Job.Status;
037import org.apache.oozie.client.rest.JsonBean;
038import org.apache.oozie.util.DateUtils;
039import org.apache.oozie.util.WritableUtils;
040import org.apache.openjpa.persistence.jdbc.Index;
041import org.json.simple.JSONObject;
042
043@Entity
044@Table(name = "BUNDLE_ACTIONS")
045@NamedQueries( {
046        @NamedQuery(name = "DELETE_BUNDLE_ACTION", query = "delete from BundleActionBean w where w.bundleActionId = :bundleActionId"),
047
048        @NamedQuery(name = "UPDATE_BUNDLE_ACTION_PENDING_MODTIME", query = "update BundleActionBean w set w.lastModifiedTimestamp = :lastModifiedTime, w.pending = :pending where w.bundleActionId = :bundleActionId"),
049
050        @NamedQuery(name = "UPDATE_BUNDLE_ACTION_STATUS_PENDING_MODTIME", query = "update BundleActionBean w set w.statusStr = :status, w.lastModifiedTimestamp = :lastModifiedTime, w.pending = :pending where w.bundleActionId = :bundleActionId"),
051
052        @NamedQuery(name = "UPDATE_BUNDLE_ACTION_STATUS_PENDING_MODTIME_COORDID", query = "update BundleActionBean w set w.statusStr = :status, w.lastModifiedTimestamp = :lastModifiedTime, w.pending = :pending, w.coordId = :coordId where w.bundleActionId = :bundleActionId"),
053
054        @NamedQuery(name = "GET_BUNDLE_ACTIONS_STATUS_UNIGNORED_FOR_BUNDLE", query = "select OBJECT(w) from BundleActionBean w where w.bundleId = :bundleId AND w.statusStr <> 'IGNORED'"),
055
056        @NamedQuery(name = "GET_BUNDLE_UNIGNORED_ACTION_STATUS_PENDING_FOR_BUNDLE", query = "select w.coordId, w.statusStr, w.pending from BundleActionBean w where w.bundleId = :bundleId AND w.statusStr <> 'IGNORED'"),
057
058        @NamedQuery(name = "GET_BUNDLE_ACTIONS", query = "select OBJECT(w) from BundleActionBean w"),
059
060        @NamedQuery(name = "GET_BUNDLE_WAITING_ACTIONS_OLDER_THAN", query = "select w.bundleActionId, w.bundleId, w.statusStr, w.coordId, w.coordName from BundleActionBean w where w.pending > 0 AND w.lastModifiedTimestamp <= :lastModifiedTime"),
061
062        @NamedQuery(name = "GET_BUNDLE_ACTION", query = "select OBJECT(w) from BundleActionBean w where w.bundleActionId = :bundleActionId"),
063
064        @NamedQuery(name = "GET_BUNDLE_ACTIONS_COUNT", query = "select count(w) from BundleActionBean w"),
065
066        @NamedQuery(name = "GET_BUNDLE_ACTIONS_COUNT_BY_JOB", query = "select count(w) from BundleActionBean w where w.bundleId = :bundleId"),
067
068        @NamedQuery(name = "GET_BUNDLE_ACTIONS_PENDING_TRUE_COUNT", query = "select count(w) from BundleActionBean w where w.bundleId = :bundleId AND w.pending > 0"),
069
070        @NamedQuery(name = "GET_BUNDLE_ACTIONS_NOT_EQUAL_STATUS_COUNT", query = "select count(w) from BundleActionBean w where w.bundleId = :bundleId AND w.statusStr <> :status"),
071
072        @NamedQuery(name = "GET_BUNDLE_ACTIONS_NOT_TERMINATE_STATUS_COUNT", query = "select count(w) from BundleActionBean w where w.bundleId = :bundleId AND (w.statusStr = 'PREP' OR w.statusStr = 'RUNNING' OR w.statusStr = 'RUNNINGWITHERROR' OR w.statusStr = 'SUSPENDED' OR w.statusStr = 'SUSPENDEDWITHERROR' OR w.statusStr = 'PREPSUSPENDED' OR w.statusStr = 'PAUSED' OR  w.statusStr = 'PAUSEDWITHERROR' OR w.statusStr = 'PREPPAUSED')"),
073
074        @NamedQuery(name = "GET_BUNDLE_ACTIONS_FAILED_NULL_COORD_COUNT", query = "select count(w) from BundleActionBean w where w.bundleId = :bundleId AND w.statusStr = 'FAILED' AND w.coordId IS NULL"),
075
076        @NamedQuery(name = "GET_BUNDLE_ACTIONS_OLDER_THAN", query = "select OBJECT(w) from BundleActionBean w order by w.lastModifiedTimestamp"),
077
078        @NamedQuery(name = "DELETE_COMPLETED_ACTIONS_FOR_BUNDLE", query = "delete from BundleActionBean a where a.bundleId = :bundleId and (a.statusStr = 'SUCCEEDED' OR a.statusStr = 'FAILED' OR a.statusStr= 'KILLED' OR a.statusStr = 'DONEWITHERROR')"),
079
080        @NamedQuery(name = "DELETE_ACTIONS_FOR_BUNDLE", query = "delete from BundleActionBean a where a.bundleId  IN (:bundleId)")})
081public class BundleActionBean implements Writable, JsonBean {
082
083    @Id
084    @Column(name = "bundle_action_id")
085    private String bundleActionId = null;
086
087    @Index
088    @Column(name = "bundle_id")
089    private String bundleId = null;
090
091    @Column(name = "coord_name")
092    private String coordName = null;
093
094    @Basic
095    @Column(name = "coord_id")
096    private String coordId = null;
097
098    @Basic
099    @Column(name = "status")
100    private String statusStr = null;
101
102    @Basic
103    @Column(name = "critical")
104    private int critical = 0;
105
106    @Basic
107    @Column(name = "pending")
108    private int pending = 0;
109
110    @Basic
111    @Column(name = "last_modified_time")
112    private java.sql.Timestamp lastModifiedTimestamp = null;
113
114    /**
115     * bundleActionId to set
116     *
117     * @param bundleActionId the bundleActionId to set
118     */
119    public void setBundleActionId(String bundleActionId) {
120        this.bundleActionId = bundleActionId;
121    }
122
123    /**
124     * Get the Bundle Action Id.
125     *
126     * @return the bundleActionId
127     */
128    public String getBundleActionId() {
129        return bundleActionId;
130    }
131
132    /**
133     * Get the BundleId
134     *
135     * @return bundleId
136     */
137    public String getBundleId() {
138        return bundleId;
139    }
140
141    /**
142     * Set the Bundle Id.
143     *
144     * @param bundleId the bundle Id
145     */
146    public void setBundleId(String bundleId) {
147        this.bundleId = bundleId;
148    }
149
150    /**
151     * Get the Coordinator name.
152     *
153     * @return coordName
154     */
155    public String getCoordName() {
156        return coordName;
157    }
158
159    /**
160     * Set the Coordinator name.
161     *
162     * @param coordName the Coordinator name
163     */
164    public void setCoordName(String coordName) {
165        this.coordName = coordName;
166    }
167
168    /**
169     * Get the coordinator Id.
170     *
171     * @return the coordId
172     */
173    public String getCoordId() {
174        return coordId;
175    }
176
177    /**
178     * Set the coordinator Id.
179     *
180     * @param coordId the coordinator id
181     */
182    public void setCoordId(String coordId) {
183        this.coordId = coordId;
184    }
185
186    /**
187     * Get the Status of the Bundle Action
188     *
189     * @return status object
190     */
191    public Status getStatus() {
192        return Status.valueOf(this.statusStr);
193    }
194
195    /**
196     * Get the Status of the Bundle Action
197     *
198     * @return status string
199     */
200    public String getStatusStr() {
201        return statusStr;
202    }
203
204    /**
205     * Set the Status of the Bundle Action
206     *
207     * @param statusStr status string
208     */
209    public void setStatusStr(String statusStr) {
210        this.statusStr = statusStr;
211    }
212
213    /**
214     * Set the Status of the Bundle Action
215     *
216     * @param val the status to set
217     */
218    public void setStatus(Status val) {
219        this.statusStr = val.toString();
220    }
221
222    /**
223     * Set that this bundle action is critical.
224     */
225    public void setCritical() {
226        this.critical = 1;
227    }
228
229    /**
230     * Reseset that this bundle action is not critical.
231     */
232    public void resetCritical() {
233        this.critical = 0;
234    }
235
236    /**
237     * Return if the action is critical.
238     *
239     * @return if the action is critical.
240     */
241    public boolean isCritical() {
242        return critical == 1 ? true : false;
243    }
244
245    /**
246     * Set some actions are in progress for particular bundle action.
247     *
248     * @param pending set pending to true
249     */
250    public void setPending(int pending) {
251        this.pending = pending;
252    }
253
254    /**
255     * increment pending and return it
256     *
257     * @return pending
258     */
259    public int incrementAndGetPending() {
260        this.pending++;
261        return pending;
262    }
263
264    /**
265     * decrement pending and return it
266     *
267     * @return pending
268     */
269    public int decrementAndGetPending() {
270        this.pending = Math.max(this.pending-1, 0);
271        return pending;
272    }
273
274    /**
275     * Get some actions are in progress for particular bundle action.
276     *
277     * @return pending
278     */
279    public int getPending() {
280        return this.pending;
281    }
282
283    /**
284     * Return if the action is pending.
285     *
286     * @return if the action is pending.
287     */
288    public boolean isPending() {
289        return pending > 0 ? true : false;
290    }
291
292    /**
293     * @return true if in terminal status
294     */
295    public boolean isTerminalStatus() {
296        boolean isTerminal = false;
297        switch (getStatus()) {
298            case SUCCEEDED:
299            case FAILED:
300            case KILLED:
301            case DONEWITHERROR:
302                isTerminal = true;
303                break;
304            default:
305                isTerminal = false;
306                break;
307        }
308        return isTerminal;
309    }
310
311    /**
312     * Set Last modified time.
313     *
314     * @param lastModifiedTimestamp the lastModifiedTimestamp to set
315     */
316    public void setLastModifiedTimestamp(java.sql.Timestamp lastModifiedTimestamp) {
317        this.lastModifiedTimestamp = lastModifiedTimestamp;
318    }
319
320    /**
321     * Set Last modified time.
322     *
323     * @param lastModifiedTime the lastModifiedTime to set
324     */
325    public void setLastModifiedTime(Date lastModifiedTime) {
326        this.lastModifiedTimestamp = DateUtils.convertDateToTimestamp(lastModifiedTime);
327    }
328
329    /**
330     * Get Last modified time.
331     *
332     * @return lastModifiedTime
333     */
334    public Date getLastModifiedTime() {
335        return DateUtils.toDate(lastModifiedTimestamp);
336    }
337
338    /**
339     * Get Last modified time.
340     *
341     * @return lastModifiedTimestamp
342     */
343    public Timestamp getLastModifiedTimestamp() {
344        return lastModifiedTimestamp;
345    }
346
347    @Override
348    public void write(DataOutput dataOutput) throws IOException {
349        WritableUtils.writeStr(dataOutput, getBundleActionId());
350        WritableUtils.writeStr(dataOutput, getBundleId());
351        WritableUtils.writeStr(dataOutput, getCoordName());
352        WritableUtils.writeStr(dataOutput, getCoordId());
353        WritableUtils.writeStr(dataOutput, getStatusStr());
354        dataOutput.writeInt(critical);
355        dataOutput.writeInt(pending);
356        dataOutput.writeLong((getLastModifiedTimestamp() != null) ? getLastModifiedTimestamp().getTime() : -1);
357    }
358
359    @Override
360    public void readFields(DataInput dataInput) throws IOException {
361        setBundleActionId(WritableUtils.readStr(dataInput));
362        setBundleId(WritableUtils.readStr(dataInput));
363        setCoordName(WritableUtils.readStr(dataInput));
364        setCoordId(WritableUtils.readStr(dataInput));
365        setStatus(Status.valueOf(WritableUtils.readStr(dataInput)));
366        critical = dataInput.readInt();
367        pending = dataInput.readInt();
368        long d = dataInput.readLong();
369        if (d != -1) {
370            setLastModifiedTime(new Date(d));
371        }
372    }
373
374    @Override
375    public JSONObject toJSONObject() {
376        return null;
377    }
378
379    @Override
380    public JSONObject toJSONObject(String timeZoneId) {
381        return null;
382    }
383}