001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie;
019
020import java.io.DataInput;
021import java.io.DataOutput;
022import java.io.IOException;
023import java.sql.Timestamp;
024import java.text.MessageFormat;
025import java.util.Date;
026import java.util.List;
027
028import javax.persistence.Basic;
029import javax.persistence.Column;
030import javax.persistence.ColumnResult;
031import javax.persistence.Entity;
032import javax.persistence.Id;
033import javax.persistence.Lob;
034import javax.persistence.NamedNativeQueries;
035import javax.persistence.NamedNativeQuery;
036import javax.persistence.NamedQueries;
037import javax.persistence.NamedQuery;
038import javax.persistence.SqlResultSetMapping;
039import javax.persistence.Table;
040
041import org.apache.hadoop.io.Writable;
042import org.apache.oozie.client.CoordinatorAction;
043import org.apache.oozie.client.rest.JsonBean;
044import org.apache.oozie.client.rest.JsonTags;
045import org.apache.oozie.client.rest.JsonUtils;
046import org.apache.oozie.util.DateUtils;
047import org.apache.oozie.util.WritableUtils;
048import org.apache.openjpa.persistence.jdbc.Index;
049import org.apache.openjpa.persistence.jdbc.Strategy;
050import org.json.simple.JSONArray;
051import org.json.simple.JSONObject;
052
053@SqlResultSetMapping(
054        name = "CoordActionJobIdLmt",
055        columns = {@ColumnResult(name = "job_id"),
056            @ColumnResult(name = "min_lmt")})
057
058@Entity
059@NamedQueries({
060
061        @NamedQuery(name = "UPDATE_COORD_ACTION", query = "update CoordinatorActionBean w set w.actionNumber = :actionNumber, w.actionXml = :actionXml, w.consoleUrl = :consoleUrl, w.createdConf = :createdConf, w.errorCode = :errorCode, w.errorMessage = :errorMessage, w.externalStatus = :externalStatus, w.missingDependencies = :missingDependencies, w.runConf = :runConf, w.timeOut = :timeOut, w.trackerUri = :trackerUri, w.type = :type, w.createdTimestamp = :createdTime, w.externalId = :externalId, w.jobId = :jobId, w.lastModifiedTimestamp = :lastModifiedTime, w.nominalTimestamp = :nominalTime, w.slaXml = :slaXml, w.statusStr = :status where w.id = :id"),
062
063        @NamedQuery(name = "UPDATE_COORD_ACTION_MIN", query = "update CoordinatorActionBean w set w.actionXml = :actionXml, w.missingDependencies = :missingDependencies, w.lastModifiedTimestamp = :lastModifiedTime, w.statusStr = :status where w.id = :id"),
064        // Query to update the action status, pending status and last modified time stamp of a Coordinator action
065        @NamedQuery(name = "UPDATE_COORD_ACTION_STATUS_PENDING_TIME", query = "update CoordinatorActionBean w set w.statusStr =:status, w.pending =:pending, w.lastModifiedTimestamp = :lastModifiedTime where w.id = :id"),
066        // Update query for InputCheck
067        @NamedQuery(name = "UPDATE_COORD_ACTION_FOR_INPUTCHECK", query = "update CoordinatorActionBean w set w.statusStr = :status, w.lastModifiedTimestamp = :lastModifiedTime, w.actionXml = :actionXml, w.missingDependencies = :missingDependencies where w.id = :id"),
068        // Update query for Push-based missing dependency check
069        @NamedQuery(name = "UPDATE_COORD_ACTION_FOR_PUSH_INPUTCHECK", query = "update CoordinatorActionBean w set w.statusStr = :status, w.lastModifiedTimestamp = :lastModifiedTime,  w.actionXml = :actionXml, w.pushMissingDependencies = :pushMissingDependencies where w.id = :id"),
070        // Update query for Push-based missing dependency check
071        @NamedQuery(name = "UPDATE_COORD_ACTION_DEPENDENCIES", query = "update CoordinatorActionBean w set w.missingDependencies = :missingDependencies, w.pushMissingDependencies = :pushMissingDependencies where w.id = :id"),
072        // Update query for Start
073        @NamedQuery(name = "UPDATE_COORD_ACTION_FOR_START", query = "update CoordinatorActionBean w set w.statusStr =:status, w.lastModifiedTimestamp = :lastModifiedTime, w.runConf = :runConf, w.externalId = :externalId, w.pending = :pending, w.errorCode = :errorCode, w.errorMessage = :errorMessage  where w.id = :id"),
074
075        @NamedQuery(name = "UPDATE_COORD_ACTION_FOR_MODIFIED_DATE", query = "update CoordinatorActionBean w set w.lastModifiedTimestamp = :lastModifiedTime where w.id = :id"),
076
077        @NamedQuery(name = "UPDATE_COORD_ACTION_RERUN", query = "update CoordinatorActionBean w set w.actionXml =:actionXml, w.statusStr = :status, w.externalId = :externalId, w.externalStatus = :externalStatus, w.rerunTimestamp = :rerunTime, w.lastModifiedTimestamp = :lastModifiedTime, w.createdTimestamp = :createdTime, w.createdConf = :createdConf, w.runConf = :runConf, w.missingDependencies = :missingDependencies, w.pushMissingDependencies = :pushMissingDependencies, w.errorCode = :errorCode, w.errorMessage = :errorMessage where w.id = :id"),
078
079        @NamedQuery(name = "DELETE_COMPLETED_ACTIONS_FOR_COORDINATOR", query = "delete from CoordinatorActionBean a where a.jobId = :jobId and (a.statusStr = 'SUCCEEDED' OR a.statusStr = 'FAILED' OR a.statusStr= 'KILLED')"),
080
081        @NamedQuery(name = "DELETE_ACTIONS_FOR_COORDINATOR", query = "delete from CoordinatorActionBean a where a.jobId = :jobId"),
082
083        @NamedQuery(name = "DELETE_ACTIONS_FOR_LONG_RUNNING_COORDINATOR", query = "delete from CoordinatorActionBean a where a.id = :actionId"),
084
085        @NamedQuery(name = "DELETE_UNSCHEDULED_ACTION", query = "delete from CoordinatorActionBean a where a.id = :id and (a.statusStr = 'WAITING' OR a.statusStr = 'READY')"),
086
087        // Query used by XTestcase to setup tables
088        @NamedQuery(name = "GET_COORD_ACTIONS", query = "select OBJECT(w) from CoordinatorActionBean w"),
089        // Select query used only by test cases
090        @NamedQuery(name = "GET_COORD_ACTION", query = "select OBJECT(a) from CoordinatorActionBean a where a.id = :id"),
091
092        // Select query used by SLAService on restart
093        @NamedQuery(name = "GET_COORD_ACTION_FOR_SLA", query = "select a.id, a.jobId, a.statusStr, a.externalId, a.lastModifiedTimestamp from CoordinatorActionBean a where a.id = :id"),
094        // Select query used by ActionInfo command
095        @NamedQuery(name = "GET_COORD_ACTION_FOR_INFO", query = "select a.id, a.jobId, a.actionNumber, a.consoleUrl, a.errorCode, a.errorMessage, a.externalId, a.externalStatus, a.trackerUri, a.createdTimestamp, a.nominalTimestamp, a.statusStr, a.lastModifiedTimestamp, a.missingDependencies, a.pushMissingDependencies from CoordinatorActionBean a where a.id = :id"),
096        // Select Query used by Timeout and skip commands
097        @NamedQuery(name = "GET_COORD_ACTION_FOR_TIMEOUT", query = "select a.id, a.jobId, a.statusStr, a.runConf, a.pending, a.nominalTimestamp, a.createdTimestamp from CoordinatorActionBean a where a.id = :id"),
098        // Select query used by InputCheck command
099        @NamedQuery(name = "GET_COORD_ACTION_FOR_INPUTCHECK", query = "select a.id, a.jobId, a.statusStr, a.runConf, a.nominalTimestamp, a.createdTimestamp, a.actionXml, a.missingDependencies, a.pushMissingDependencies, a.timeOut from CoordinatorActionBean a where a.id = :id"),
100        // Select query used by CoordActionUpdate command
101        @NamedQuery(name = "GET_COORD_ACTION_FOR_EXTERNALID", query = "select a.id, a.jobId, a.statusStr, a.pending, a.externalId, a.lastModifiedTimestamp, a.slaXml, a.nominalTimestamp, a.createdTimestamp from CoordinatorActionBean a where a.externalId = :externalId"),
102        // Select query used by Check command
103        @NamedQuery(name = "GET_COORD_ACTION_FOR_CHECK", query = "select a.id, a.jobId, a.statusStr, a.pending, a.externalId, a.lastModifiedTimestamp, a.slaXml, a.nominalTimestamp, a.createdTimestamp from CoordinatorActionBean a where a.id = :id"),
104        // Select query used by Start command
105        @NamedQuery(name = "GET_COORD_ACTION_FOR_START", query = "select a.id, a.jobId, a.statusStr, a.pending, a.createdConf, a.slaXml, a.actionXml, a.externalId, a.errorMessage, a.errorCode, a.nominalTimestamp, a.createdTimestamp from CoordinatorActionBean a where a.id = :id"),
106
107        @NamedQuery(name = "GET_COORD_ACTIONS_FOR_JOB_FIFO", query = "select a.id, a.jobId, a.statusStr, a.pending, a.nominalTimestamp, a.createdTimestamp from CoordinatorActionBean a where a.jobId = :jobId AND a.statusStr = 'READY' order by a.nominalTimestamp"),
108
109        @NamedQuery(name = "GET_COORD_ACTIONS_FOR_JOB_LIFO", query = "select a.id, a.jobId, a.statusStr, a.pending, a.nominalTimestamp, a.createdTimestamp from CoordinatorActionBean a where a.jobId = :jobId AND a.statusStr = 'READY' order by a.nominalTimestamp desc"),
110
111        @NamedQuery(name = "GET_COORD_RUNNING_ACTIONS_COUNT", query = "select count(a) from CoordinatorActionBean a where a.jobId = :jobId AND (a.statusStr = 'RUNNING' OR a.statusStr='SUBMITTED')"),
112
113        @NamedQuery(name = "GET_COORD_ACTIONS_COUNT_BY_JOBID", query = "select count(a) from CoordinatorActionBean a where a.jobId = :jobId"),
114
115        @NamedQuery(name = "GET_COORD_ACTIVE_ACTIONS_COUNT_BY_JOBID", query = "select count(a) from CoordinatorActionBean a where a.jobId = :jobId AND a.statusStr = 'WAITING'"),
116
117        @NamedQuery(name = "GET_COORD_ACTIONS_PENDING_FALSE_COUNT", query = "select count(a) from CoordinatorActionBean a where a.jobId = :jobId AND a.pending = 0 AND (a.statusStr = 'SUSPENDED' OR a.statusStr = 'TIMEDOUT' OR a.statusStr = 'SUCCEEDED' OR a.statusStr = 'KILLED' OR a.statusStr = 'FAILED')"),
118
119        @NamedQuery(name = "GET_COORD_ACTIONS_PENDING_FALSE_STATUS_COUNT", query = "select count(a) from CoordinatorActionBean a where a.jobId = :jobId AND a.pending = 0 AND a.statusStr = :status"),
120
121        @NamedQuery(name = "GET_ACTIONS_FOR_COORD_JOB", query = "select count(a) from CoordinatorActionBean a where a.jobId = :jobId"),
122        // Query to retrieve Coordinator actions sorted by nominal time
123        @NamedQuery(name = "GET_ACTIONS_FOR_COORD_JOB_ORDER_BY_NOMINAL_TIME", query = "select a.id, a.actionNumber, a.consoleUrl, a.errorCode, a.errorMessage, a.externalId, a.externalStatus, a.jobId, a.trackerUri, a.createdTimestamp, a.nominalTimestamp, a.statusStr, a.lastModifiedTimestamp, a.missingDependencies, a.pushMissingDependencies, a.timeOut from CoordinatorActionBean a where a.jobId = :jobId order by a.nominalTimestamp"),
124        // Query to maintain backward compatibility for coord job info command
125        @NamedQuery(name = "GET_ALL_COLS_FOR_ACTIONS_FOR_COORD_JOB_ORDER_BY_NOMINAL_TIME", query = "select OBJECT(a) from CoordinatorActionBean a where a.jobId = :jobId order by a.nominalTimestamp"),
126        // Query to retrieve action id, action status, pending status and external Id of not completed Coordinator actions
127        @NamedQuery(name = "GET_COORD_ACTIONS_NOT_COMPLETED", query = "select a.id, a.statusStr, a.pending, a.externalId, a.pushMissingDependencies, a.nominalTimestamp, a.createdTimestamp, a.jobId from CoordinatorActionBean a where a.jobId = :jobId AND a.statusStr <> 'FAILED' AND a.statusStr <> 'TIMEDOUT' AND a.statusStr <> 'SUCCEEDED' AND a.statusStr <> 'KILLED' AND a.statusStr <> 'IGNORED'"),
128
129        // Query to retrieve action id, action status, pending status and external Id of running Coordinator actions
130        @NamedQuery(name = "GET_COORD_ACTIONS_RUNNING", query = "select a.id, a.statusStr, a.pending, a.externalId, a.nominalTimestamp, a.createdTimestamp from CoordinatorActionBean a where a.jobId = :jobId and a.statusStr = 'RUNNING'"),
131
132        // Query to retrieve action id, action status, pending status and external Id of suspended Coordinator actions
133        @NamedQuery(name = "GET_COORD_ACTIONS_SUSPENDED", query = "select a.id, a.statusStr, a.pending, a.externalId, a.nominalTimestamp, a.createdTimestamp from CoordinatorActionBean a where a.jobId = :jobId and a.statusStr = 'SUSPENDED'"),
134
135        // Query to retrieve count of Coordinator actions which are pending
136        @NamedQuery(name = "GET_COORD_ACTIONS_PENDING_COUNT", query = "select count(a) from CoordinatorActionBean a where a.jobId = :jobId AND a.pending > 0"),
137
138        // Query to retrieve status of Coordinator actions
139        @NamedQuery(name = "GET_COORD_ACTIONS_STATUS_UNIGNORED", query = "select a.statusStr from CoordinatorActionBean a where a.jobId = :jobId AND a.statusStr <> 'IGNORED'"),
140
141        // Query to retrieve status of Coordinator actions
142        @NamedQuery(name = "GET_COORD_ACTION_STATUS", query = "select a.statusStr from CoordinatorActionBean a where a.id = :id"),
143
144        @NamedQuery(name = "GET_COORD_ACTION_FOR_COORD_JOB_BY_ACTION_NUMBER", query = "select a.id from CoordinatorActionBean a where a.jobId = :jobId AND a.actionNumber = :actionNumber"),
145
146        @NamedQuery(name = "GET_COORD_ACTIONS_BY_LAST_MODIFIED_TIME", query = "select a.jobId from CoordinatorActionBean a where a.lastModifiedTimestamp >= :lastModifiedTime"),
147
148        //Used by coordinator store only
149        @NamedQuery(name = "GET_RUNNING_ACTIONS_FOR_COORD_JOB", query = "select OBJECT(a) from CoordinatorActionBean a where a.jobId = :jobId AND a.statusStr = 'RUNNING'"),
150
151        @NamedQuery(name = "GET_RUNNING_ACTIONS_OLDER_THAN", query = "select a.id from CoordinatorActionBean a where a.statusStr = 'RUNNING' AND a.lastModifiedTimestamp <= :lastModifiedTime"),
152
153        @NamedQuery(name = "GET_COORD_ACTIONS_WAITING_SUBMITTED_OLDER_THAN", query = "select a.id, a.jobId, a.statusStr, a.externalId, a.pushMissingDependencies from CoordinatorActionBean a where (a.statusStr = 'WAITING' OR a.statusStr = 'SUBMITTED') AND a.lastModifiedTimestamp <= :lastModifiedTime"),
154
155        @NamedQuery(name = "GET_COORD_ACTIONS_FOR_RECOVERY_OLDER_THAN", query = "select a.id, a.jobId, a.statusStr, a.externalId, a.pending from CoordinatorActionBean a where a.pending > 0 AND (a.statusStr = 'SUSPENDED' OR a.statusStr = 'KILLED' OR a.statusStr = 'RUNNING') AND a.lastModifiedTimestamp <= :lastModifiedTime"),
156        // Select query used by rerun, requires almost all columns so select * is used
157        @NamedQuery(name = "GET_ACTIONS_FOR_DATES", query = "select OBJECT(a) from CoordinatorActionBean a where a.jobId = :jobId AND (a.statusStr = 'TIMEDOUT' OR a.statusStr = 'SUCCEEDED' OR a.statusStr = 'KILLED' OR a.statusStr = 'FAILED' OR a.statusStr = 'IGNORED') AND a.nominalTimestamp >= :startTime AND a.nominalTimestamp <= :endTime"),
158        // Select query used by log
159        @NamedQuery(name = "GET_ACTION_IDS_FOR_DATES", query = "select a.id from CoordinatorActionBean a where a.jobId = :jobId AND (a.statusStr = 'TIMEDOUT' OR a.statusStr = 'SUCCEEDED' OR a.statusStr = 'KILLED' OR a.statusStr = 'FAILED') AND a.nominalTimestamp >= :startTime AND a.nominalTimestamp <= :endTime"),
160        // Select query used by rerun, requires almost all columns so select * is used
161        @NamedQuery(name = "GET_ACTION_FOR_NOMINALTIME", query = "select OBJECT(a) from CoordinatorActionBean a where a.jobId = :jobId AND a.nominalTimestamp = :nominalTime"),
162
163        @NamedQuery(name = "GET_ACTIONS_BY_DATES_FOR_KILL", query = "select a.id, a.jobId, a.statusStr, a.externalId, a.pending, a.nominalTimestamp, a.createdTimestamp from CoordinatorActionBean a where a.jobId = :jobId AND (a.statusStr <> 'FAILED' AND a.statusStr <> 'KILLED' AND a.statusStr <> 'SUCCEEDED' AND a.statusStr <> 'TIMEDOUT') AND a.nominalTimestamp >= :startTime AND a.nominalTimestamp <= :endTime"),
164
165        @NamedQuery(name = "GET_COORD_ACTIONS_COUNT", query = "select count(w) from CoordinatorActionBean w"),
166
167        @NamedQuery(name = "GET_COORD_ACTIONS_COUNT_RUNNING_FOR_RANGE", query = "select count(w) from CoordinatorActionBean w where w.statusStr = 'RUNNING' and w.jobId= :jobId and w.id >= :startAction AND w.id <= :endAction"),
168
169        @NamedQuery(name = "GET_COORD_ACTIONS_MAX_MODIFIED_DATE_FOR_RANGE", query = "select max(w.lastModifiedTimestamp) from CoordinatorActionBean w where w.jobId= :jobId and w.id >= :startAction AND w.id <= :endAction")})
170
171@NamedNativeQueries({
172
173    @NamedNativeQuery(name = "GET_READY_ACTIONS_GROUP_BY_JOBID", query = "select a.job_id as job_id, MIN(a.last_modified_time) as min_lmt from COORD_ACTIONS a where a.status = 'READY' GROUP BY a.job_id HAVING MIN(a.last_modified_time) < ?", resultSetMapping = "CoordActionJobIdLmt")
174        })
175@Table(name = "COORD_ACTIONS")
176public class CoordinatorActionBean implements
177        Writable,CoordinatorAction,JsonBean {
178
179    @Id
180    private String id;
181
182    @Basic
183    @Index
184    @Column(name = "job_id")
185    private String jobId;
186
187    @Basic
188    @Index
189    @Column(name = "status")
190    private String statusStr = CoordinatorAction.Status.WAITING.toString();
191
192    @Basic
193    @Index
194    @Column(name = "nominal_time")
195    private java.sql.Timestamp nominalTimestamp = null;
196
197    @Basic
198    @Index
199    @Column(name = "last_modified_time")
200    private java.sql.Timestamp lastModifiedTimestamp = null;
201
202    @Basic
203    @Index
204    @Column(name = "created_time")
205    private java.sql.Timestamp createdTimestamp = null;
206
207    @Basic
208    @Index
209    @Column(name = "rerun_time")
210    private java.sql.Timestamp rerunTimestamp = null;
211
212    @Basic
213    @Index
214    @Column(name = "external_id")
215    private String externalId;
216
217    @Basic
218    @Column(name = "sla_xml")
219    @Lob
220    @Strategy("org.apache.oozie.executor.jpa.StringBlobValueHandler")
221    private StringBlob slaXml = null;
222
223    @Basic
224    @Column(name = "pending")
225    private int pending = 0;
226
227    @Basic
228    @Column(name = "job_type")
229    private String type;
230
231    @Basic
232    @Column(name = "action_number")
233    private int actionNumber;
234
235    @Basic
236    @Column(name = "created_conf")
237    @Lob
238    @Strategy("org.apache.oozie.executor.jpa.StringBlobValueHandler")
239    private StringBlob createdConf;
240
241    @Basic
242    @Column(name = "time_out")
243    private int timeOut = 0;
244
245    @Basic
246    @Column(name = "run_conf")
247    @Lob
248    @Strategy("org.apache.oozie.executor.jpa.StringBlobValueHandler")
249    private StringBlob runConf;
250
251    @Basic
252    @Column(name = "action_xml")
253    @Lob
254    @Strategy("org.apache.oozie.executor.jpa.StringBlobValueHandler")
255    private StringBlob actionXml;
256
257    @Basic
258    @Column(name = "missing_dependencies")
259    @Lob
260    @Strategy("org.apache.oozie.executor.jpa.StringBlobValueHandler")
261    private StringBlob missingDependencies;
262
263    @Basic
264    @Column(name = "push_missing_dependencies")
265    @Lob
266    @Strategy("org.apache.oozie.executor.jpa.StringBlobValueHandler")
267    private StringBlob pushMissingDependencies;
268
269    @Basic
270    @Column(name = "external_status")
271    private String externalStatus;
272
273    @Basic
274    @Column(name = "tracker_uri")
275    private String trackerUri;
276
277    @Basic
278    @Column(name = "console_url")
279    private String consoleUrl;
280
281    @Basic
282    @Column(name = "error_code")
283    private String errorCode;
284
285    @Basic
286    @Column(name = "error_message")
287    private String errorMessage;
288
289    @SuppressWarnings("unchecked")
290    public JSONObject toJSONObject() {
291        return toJSONObject("GMT");
292    }
293
294    public CoordinatorActionBean() {
295    }
296
297    /**
298     * Serialize the coordinator bean to a data output.
299     *
300     * @param dataOutput data output.
301     * @throws IOException thrown if the coordinator bean could not be
302     *         serialized.
303     */
304    @Override
305    public void write(DataOutput dataOutput) throws IOException {
306        WritableUtils.writeStr(dataOutput, getJobId());
307        WritableUtils.writeStr(dataOutput, getType());
308        WritableUtils.writeStr(dataOutput, getId());
309        WritableUtils.writeStr(dataOutput, getCreatedConf());
310        WritableUtils.writeStr(dataOutput, getStatus().toString());
311        dataOutput.writeInt(getActionNumber());
312        WritableUtils.writeStr(dataOutput, getRunConf());
313        WritableUtils.writeStr(dataOutput, getExternalStatus());
314        WritableUtils.writeStr(dataOutput, getTrackerUri());
315        WritableUtils.writeStr(dataOutput, getConsoleUrl());
316        WritableUtils.writeStr(dataOutput, getErrorCode());
317        WritableUtils.writeStr(dataOutput, getErrorMessage());
318        dataOutput.writeLong((getCreatedTime() != null) ? getCreatedTime().getTime() : -1);
319        dataOutput.writeLong((getLastModifiedTime() != null) ? getLastModifiedTime().getTime() : -1);
320    }
321
322    /**
323     * Deserialize a coordinator bean from a data input.
324     *
325     * @param dataInput data input.
326     * @throws IOException thrown if the workflow bean could not be
327     *         deserialized.
328     */
329    @Override
330    public void readFields(DataInput dataInput) throws IOException {
331        setJobId(WritableUtils.readStr(dataInput));
332        setType(WritableUtils.readStr(dataInput));
333        setId(WritableUtils.readStr(dataInput));
334        setCreatedConf(WritableUtils.readStr(dataInput));
335        setStatus(CoordinatorAction.Status.valueOf(WritableUtils.readStr(dataInput)));
336        setActionNumber(dataInput.readInt());
337        setRunConf(WritableUtils.readStr(dataInput));
338        setExternalStatus(WritableUtils.readStr(dataInput));
339        setTrackerUri(WritableUtils.readStr(dataInput));
340        setConsoleUrl(WritableUtils.readStr(dataInput));
341        setErrorCode(WritableUtils.readStr(dataInput));
342        setErrorMessage(WritableUtils.readStr(dataInput));
343        long d = dataInput.readLong();
344        if (d != -1) {
345            setCreatedTime(new Date(d));
346        }
347        d = dataInput.readLong();
348        if (d != -1) {
349            setLastModifiedTime(new Date(d));
350        }
351    }
352
353    @Override
354    public String getJobId() {
355        return this.jobId;
356    }
357
358    public void setJobId(String id) {
359        this.jobId = id;
360    }
361
362    @Override
363    public Status getStatus() {
364        return Status.valueOf(statusStr);
365    }
366
367    /**
368     * Return the status in string
369     * @return
370     */
371    public String getStatusStr() {
372        return statusStr;
373    }
374
375    public void setStatus(Status status) {
376        this.statusStr = status.toString();
377    }
378
379    public void setStatusStr(String statusStr) {
380        this.statusStr = statusStr;
381    }
382
383    public void setCreatedTime(Date createdTime) {
384        this.createdTimestamp = DateUtils.convertDateToTimestamp(createdTime);
385    }
386
387    public void setRerunTime(Date rerunTime) {
388        this.rerunTimestamp = DateUtils.convertDateToTimestamp(rerunTime);
389    }
390
391    public void setNominalTime(Date nominalTime) {
392        this.nominalTimestamp = DateUtils.convertDateToTimestamp(nominalTime);
393    }
394
395    public void setLastModifiedTime(Date lastModifiedTime) {
396        this.lastModifiedTimestamp = DateUtils.convertDateToTimestamp(lastModifiedTime);
397    }
398
399    public Date getCreatedTime() {
400        return DateUtils.toDate(createdTimestamp);
401    }
402
403    public Timestamp getCreatedTimestamp() {
404        return createdTimestamp;
405    }
406
407    public Date getRerunTime() {
408        return DateUtils.toDate(rerunTimestamp);
409    }
410
411    public Timestamp getRerunTimestamp() {
412        return rerunTimestamp;
413    }
414
415    @Override
416    public Date getLastModifiedTime() {
417        return DateUtils.toDate(lastModifiedTimestamp);
418    }
419
420    public Timestamp getLastModifiedTimestamp() {
421        return lastModifiedTimestamp;
422    }
423
424    @Override
425    public Date getNominalTime() {
426        return DateUtils.toDate(nominalTimestamp);
427    }
428
429    public Timestamp getNominalTimestamp() {
430        return nominalTimestamp;
431    }
432
433    @Override
434    public String getExternalId() {
435        return externalId;
436    }
437
438    public void setExternalId(String externalId) {
439        this.externalId = externalId;
440    }
441
442    public StringBlob getSlaXmlBlob() {
443        return slaXml;
444    }
445
446    public void setSlaXmlBlob(StringBlob slaXml) {
447        this.slaXml = slaXml;
448    }
449
450    public String getSlaXml() {
451        return slaXml == null ? null : slaXml.getString();
452    }
453
454    public void setSlaXml(String slaXml) {
455        if (this.slaXml == null) {
456            this.slaXml = new StringBlob(slaXml);
457        }
458        else {
459            this.slaXml.setString(slaXml);
460        }
461    }
462
463    /**
464     * @return true if in terminal status
465     */
466    public boolean isTerminalStatus() {
467        boolean isTerminal = true;
468        switch (getStatus()) {
469            case WAITING:
470            case READY:
471            case SUBMITTED:
472            case RUNNING:
473            case SUSPENDED:
474                isTerminal = false;
475                break;
476            default:
477                isTerminal = true;
478                break;
479        }
480        return isTerminal;
481    }
482
483    /**
484     * Return if the action is complete with failure.
485     *
486     * @return if the action is complete with failure.
487     */
488    public boolean isTerminalWithFailure() {
489        boolean result = false;
490        switch (getStatus()) {
491            case FAILED:
492            case KILLED:
493            case TIMEDOUT:
494                result = true;
495        }
496        return result;
497    }
498
499    /**
500     * Set some actions are in progress for particular coordinator action.
501     *
502     * @param pending set pending to true
503     */
504    public void setPending(int pending) {
505        this.pending = pending;
506    }
507
508    /**
509     * increment pending and return it
510     *
511     * @return pending
512     */
513    public int incrementAndGetPending() {
514        this.pending++;
515        return pending;
516    }
517
518    /**
519     * decrement pending and return it
520     *
521     * @return pending
522     */
523    public int decrementAndGetPending() {
524        this.pending = Math.max(this.pending - 1, 0);
525        return pending;
526    }
527
528    /**
529     * Get some actions are in progress for particular bundle action.
530     *
531     * @return pending
532     */
533    public int getPending() {
534        return this.pending;
535    }
536
537    /**
538     * Return if the action is pending.
539     *
540     * @return if the action is pending.
541     */
542    public boolean isPending() {
543        return pending > 0 ? true : false;
544    }
545
546    @Override
547    public String getId() {
548        return id;
549    }
550
551    public void setId(String id) {
552        this.id = id;
553    }
554
555    public String getType() {
556        return type;
557    }
558
559    public void setType(String type) {
560        this.type = type;
561    }
562
563    public void setActionNumber(int actionNumber) {
564        this.actionNumber = actionNumber;
565    }
566
567    @Override
568    public int getActionNumber() {
569        return actionNumber;
570    }
571
572    @Override
573    public String getCreatedConf() {
574        return createdConf == null ? null : createdConf.getString();
575    }
576
577    public void setCreatedConf(String createdConf) {
578        if (this.createdConf == null) {
579            this.createdConf = new StringBlob(createdConf);
580        }
581        else {
582            this.createdConf.setString(createdConf);
583        }
584    }
585
586    public void setCreatedConfBlob(StringBlob createdConf) {
587        this.createdConf = createdConf;
588    }
589
590    public StringBlob getCreatedConfBlob() {
591        return createdConf;
592    }
593
594    public void setRunConf(String runConf) {
595        if (this.runConf == null) {
596            this.runConf = new StringBlob(runConf);
597        }
598        else {
599            this.runConf.setString(runConf);
600        }
601    }
602
603    @Override
604    public String getRunConf() {
605        return runConf == null ? null : runConf.getString();
606    }
607
608    public void setRunConfBlob(StringBlob runConf) {
609        this.runConf = runConf;
610    }
611
612    public StringBlob getRunConfBlob() {
613        return runConf;
614    }
615
616
617    public void setMissingDependencies(String missingDependencies) {
618        if (this.missingDependencies == null) {
619            this.missingDependencies = new StringBlob(missingDependencies);
620        }
621        else {
622            this.missingDependencies.setString(missingDependencies);
623        }
624    }
625
626    @Override
627    public String getMissingDependencies() {
628        return missingDependencies == null ? null : missingDependencies.getString();
629    }
630
631    public void setMissingDependenciesBlob(StringBlob missingDependencies) {
632        this.missingDependencies = missingDependencies;
633    }
634
635    public StringBlob getMissingDependenciesBlob() {
636        return missingDependencies;
637    }
638
639    @Override
640    public String getPushMissingDependencies() {
641        return pushMissingDependencies == null ? null : pushMissingDependencies.getString();
642    }
643
644    public void setPushMissingDependencies(String pushMissingDependencies) {
645        if (this.pushMissingDependencies == null) {
646            this.pushMissingDependencies = new StringBlob(pushMissingDependencies);
647        }
648        else {
649            this.pushMissingDependencies.setString(pushMissingDependencies);
650        }
651    }
652
653    public void setPushMissingDependenciesBlob(StringBlob pushMissingDependencies) {
654        this.pushMissingDependencies = pushMissingDependencies;
655    }
656
657    public StringBlob getPushMissingDependenciesBlob() {
658        return pushMissingDependencies;
659    }
660
661    public String getExternalStatus() {
662        return externalStatus;
663    }
664
665    public void setExternalStatus(String externalStatus) {
666        this.externalStatus = externalStatus;
667    }
668
669    @Override
670    public String getTrackerUri() {
671        return trackerUri;
672    }
673
674    public void setTrackerUri(String trackerUri) {
675        this.trackerUri = trackerUri;
676    }
677
678    @Override
679    public String getConsoleUrl() {
680        return consoleUrl;
681    }
682
683    public void setConsoleUrl(String consoleUrl) {
684        this.consoleUrl = consoleUrl;
685    }
686
687    @Override
688    public String getErrorCode() {
689        return errorCode;
690    }
691
692    @Override
693    public String getErrorMessage() {
694        return errorMessage;
695    }
696
697    public void setErrorInfo(String errorCode, String errorMessage) {
698        this.errorCode = errorCode;
699        this.errorMessage = errorMessage;
700    }
701
702    public String getActionXml() {
703        return actionXml == null ? null : actionXml.getString();
704    }
705
706    public void setActionXml(String actionXml) {
707        if (this.actionXml == null) {
708            this.actionXml = new StringBlob(actionXml);
709        }
710        else {
711            this.actionXml.setString(actionXml);
712        }
713    }
714
715    public void setActionXmlBlob(StringBlob actionXml) {
716        this.actionXml = actionXml;
717    }
718
719    public StringBlob getActionXmlBlob() {
720        return actionXml;
721    }
722
723    @Override
724    public String toString() {
725        return MessageFormat.format("CoordinatorAction name[{0}] status[{1}]",
726                                    getId(), getStatus());
727    }
728
729    public int getTimeOut() {
730        return timeOut;
731    }
732
733    public void setTimeOut(int timeOut) {
734        this.timeOut = timeOut;
735    }
736
737
738    public void setErrorCode(String errorCode) {
739        this.errorCode = errorCode;
740    }
741
742    public void setErrorMessage(String errorMessage) {
743        this.errorMessage = errorMessage;
744    }
745
746    @SuppressWarnings("unchecked")
747    public JSONObject toJSONObject(String timeZoneId) {
748        JSONObject json = new JSONObject();
749        json.put(JsonTags.COORDINATOR_ACTION_ID, id);
750        json.put(JsonTags.COORDINATOR_JOB_ID, jobId);
751        json.put(JsonTags.COORDINATOR_ACTION_TYPE, type);
752        json.put(JsonTags.COORDINATOR_ACTION_NUMBER, actionNumber);
753        json.put(JsonTags.COORDINATOR_ACTION_CREATED_CONF, getCreatedConf());
754        json.put(JsonTags.COORDINATOR_ACTION_CREATED_TIME, JsonUtils
755                .formatDateRfc822(getCreatedTime(), timeZoneId));
756        json.put(JsonTags.COORDINATOR_ACTION_NOMINAL_TIME, JsonUtils
757                .formatDateRfc822(getNominalTime(), timeZoneId));
758        json.put(JsonTags.COORDINATOR_ACTION_EXTERNALID, externalId);
759        // json.put(JsonTags.COORDINATOR_ACTION_START_TIME, JsonUtils
760        // .formatDateRfc822(startTime), timeZoneId);
761        json.put(JsonTags.COORDINATOR_ACTION_STATUS, statusStr);
762        json.put(JsonTags.COORDINATOR_ACTION_RUNTIME_CONF, getRunConf());
763        json.put(JsonTags.COORDINATOR_ACTION_LAST_MODIFIED_TIME, JsonUtils
764                .formatDateRfc822(getLastModifiedTime(), timeZoneId));
765        // json.put(JsonTags.COORDINATOR_ACTION_START_TIME, JsonUtils
766        // .formatDateRfc822(startTime), timeZoneId);
767        // json.put(JsonTags.COORDINATOR_ACTION_END_TIME, JsonUtils
768        // .formatDateRfc822(endTime), timeZoneId);
769        json.put(JsonTags.COORDINATOR_ACTION_MISSING_DEPS, getMissingDependencies());
770        json.put(JsonTags.COORDINATOR_ACTION_PUSH_MISSING_DEPS, getPushMissingDependencies());
771        json.put(JsonTags.COORDINATOR_ACTION_EXTERNAL_STATUS, externalStatus);
772        json.put(JsonTags.COORDINATOR_ACTION_TRACKER_URI, trackerUri);
773        json.put(JsonTags.COORDINATOR_ACTION_CONSOLE_URL, consoleUrl);
774        json.put(JsonTags.COORDINATOR_ACTION_ERROR_CODE, errorCode);
775        json.put(JsonTags.COORDINATOR_ACTION_ERROR_MESSAGE, errorMessage);
776        json.put(JsonTags.TO_STRING, toString());
777        return json;
778    }
779
780    /**
781     * Convert a nodes list into a JSONArray.
782     *
783     * @param actions nodes list.
784     * @param timeZoneId time zone to use for dates in the JSON array.
785     * @return the corresponding JSON array.
786     */
787    @SuppressWarnings("unchecked")
788    public static JSONArray toJSONArray(List<CoordinatorActionBean> actions, String timeZoneId) {
789        JSONArray array = new JSONArray();
790        for (CoordinatorActionBean action : actions) {
791            array.add(action.toJSONObject(timeZoneId));
792        }
793        return array;
794    }
795
796    @Override
797    public int hashCode() {
798        final int prime = 31;
799        int result = 1;
800        result = prime * result + ((id == null) ? 0 : id.hashCode());
801        return result;
802    }
803
804    @Override
805    public boolean equals(Object obj) {
806        if (this == obj) {
807            return true;
808        }
809        if (obj == null) {
810            return false;
811        }
812        if (getClass() != obj.getClass()) {
813            return false;
814        }
815        CoordinatorActionBean other = (CoordinatorActionBean) obj;
816        if (id == null) {
817            if (other.id != null) {
818                return false;
819            }
820        }
821        else if (!id.equals(other.id)) {
822            return false;
823        }
824        return true;
825    }
826
827
828}