This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie;
019
020 import java.io.DataInput;
021 import java.io.DataOutput;
022 import java.io.IOException;
023 import java.sql.Timestamp;
024 import java.util.Date;
025
026 import javax.persistence.Basic;
027 import javax.persistence.Column;
028 import javax.persistence.ColumnResult;
029 import javax.persistence.Entity;
030 import javax.persistence.Lob;
031 import javax.persistence.NamedNativeQueries;
032 import javax.persistence.NamedNativeQuery;
033 import javax.persistence.NamedQueries;
034 import javax.persistence.NamedQuery;
035 import javax.persistence.SqlResultSetMapping;
036
037 import org.apache.hadoop.io.Writable;
038 import org.apache.oozie.client.CoordinatorAction;
039 import org.apache.oozie.client.rest.JsonCoordinatorAction;
040 import org.apache.oozie.util.DateUtils;
041 import org.apache.oozie.util.WritableUtils;
042 import org.apache.openjpa.persistence.jdbc.Index;
043
044 @SqlResultSetMapping(
045 name = "CoordActionJobIdLmt",
046 columns = {@ColumnResult(name = "job_id"),
047 @ColumnResult(name = "min_lmt")})
048
049 @Entity
050 @NamedQueries({
051
052 @NamedQuery(name = "UPDATE_COORD_ACTION", query = "update CoordinatorActionBean w set w.actionNumber = :actionNumber, w.actionXml = :actionXml, w.consoleUrl = :consoleUrl, w.createdConf = :createdConf, w.errorCode = :errorCode, w.errorMessage = :errorMessage, w.externalStatus = :externalStatus, w.missingDependencies = :missingDependencies, w.runConf = :runConf, w.timeOut = :timeOut, w.trackerUri = :trackerUri, w.type = :type, w.createdTimestamp = :createdTime, w.externalId = :externalId, w.jobId = :jobId, w.lastModifiedTimestamp = :lastModifiedTime, w.nominalTimestamp = :nominalTime, w.slaXml = :slaXml, w.status = :status where w.id = :id"),
053
054 @NamedQuery(name = "UPDATE_COORD_ACTION_MIN", query = "update CoordinatorActionBean w set w.actionXml = :actionXml, w.missingDependencies = :missingDependencies, w.lastModifiedTimestamp = :lastModifiedTime, w.status = :status where w.id = :id"),
055
056 @NamedQuery(name = "DELETE_COMPLETED_ACTIONS_FOR_COORDINATOR", query = "delete from CoordinatorActionBean a where a.jobId = :jobId and (a.status = 'SUCCEEDED' OR a.status = 'FAILED' OR a.status= 'KILLED')"),
057
058 @NamedQuery(name = "GET_COORD_ACTIONS", query = "select OBJECT(w) from CoordinatorActionBean w"),
059
060 @NamedQuery(name = "GET_COMPLETED_ACTIONS_OLDER_THAN", query = "select OBJECT(a) from CoordinatorActionBean a where a.createdTimestamp < :createdTime and (a.status = 'SUCCEEDED' OR a.status = 'FAILED' OR a.status = 'KILLED')"),
061
062 @NamedQuery(name = "GET_COORD_ACTION", query = "select OBJECT(a) from CoordinatorActionBean a where a.id = :id"),
063
064 @NamedQuery(name = "GET_COORD_ACTION_FOR_EXTERNALID", query = "select OBJECT(a) from CoordinatorActionBean a where a.externalId = :externalId"),
065
066 @NamedQuery(name = "GET_COORD_ACTIONS_FOR_JOB_FIFO", query = "select OBJECT(a) from CoordinatorActionBean a where a.jobId = :jobId AND a.status = 'READY' order by a.nominalTimestamp"),
067
068 @NamedQuery(name = "GET_COORD_ACTIONS_FOR_JOB_LIFO", query = "select OBJECT(a) from CoordinatorActionBean a where a.jobId = :jobId AND a.status = 'READY' order by a.nominalTimestamp desc"),
069
070 @NamedQuery(name = "GET_COORD_RUNNING_ACTIONS_COUNT", query = "select count(a) from CoordinatorActionBean a where a.jobId = :jobId AND (a.status = 'RUNNING' OR a.status='SUBMITTED')"),
071
072 @NamedQuery(name = "GET_COORD_ACTIONS_COUNT_BY_JOBID", query = "select count(a) from CoordinatorActionBean a where a.jobId = :jobId"),
073
074 @NamedQuery(name = "GET_COORD_ACTIVE_ACTIONS_COUNT_BY_JOBID", query = "select count(a) from CoordinatorActionBean a where a.jobId = :jobId AND a.status = 'WAITING'"),
075
076 @NamedQuery(name = "GET_COORD_ACTIONS_PENDING_FALSE_COUNT", query = "select count(a) from CoordinatorActionBean a where a.jobId = :jobId AND a.pending = 0 AND (a.status = 'SUSPENDED' OR a.status = 'TIMEDOUT' OR a.status = 'SUCCEEDED' OR a.status = 'KILLED' OR a.status = 'FAILED')"),
077
078 @NamedQuery(name = "GET_COORD_ACTIONS_PENDING_FALSE_STATUS_COUNT", query = "select count(a) from CoordinatorActionBean a where a.jobId = :jobId AND a.pending = 0 AND a.status = :status"),
079
080 @NamedQuery(name = "GET_ACTIONS_FOR_COORD_JOB", query = "select OBJECT(a) from CoordinatorActionBean a where a.jobId = :jobId"),
081
082 @NamedQuery(name = "GET_COORD_ACTION_FOR_COORD_JOB_BY_ACTION_NUMBER", query = "select OBJECT(a) from CoordinatorActionBean a where a.jobId = :jobId AND a.actionNumber = :actionNumber"),
083
084 @NamedQuery(name = "GET_COORD_ACTIONS_BY_LAST_MODIFIED_TIME", query = "select OBJECT(w) from CoordinatorActionBean w where w.lastModifiedTimestamp >= :lastModifiedTime"),
085
086 @NamedQuery(name = "GET_RUNNING_ACTIONS_FOR_COORD_JOB", query = "select OBJECT(a) from CoordinatorActionBean a where a.jobId = :jobId AND a.status = 'RUNNING'"),
087
088 @NamedQuery(name = "GET_RUNNING_ACTIONS_OLDER_THAN", query = "select OBJECT(a) from CoordinatorActionBean a where a.status = 'RUNNING' AND a.lastModifiedTimestamp <= :lastModifiedTime"),
089
090 @NamedQuery(name = "GET_COORD_ACTIONS_WAITING_SUBMITTED_OLDER_THAN", query = "select OBJECT(a) from CoordinatorActionBean a where (a.status = 'WAITING' OR a.status = 'SUBMITTED') AND a.lastModifiedTimestamp <= :lastModifiedTime"),
091
092 @NamedQuery(name = "GET_COORD_ACTIONS_FOR_RECOVERY_OLDER_THAN", query = "select OBJECT(a) from CoordinatorActionBean a where a.pending > 0 AND (a.status = 'SUSPENDED' OR a.status = 'KILLED' OR a.status = 'RUNNING') AND a.lastModifiedTimestamp <= :lastModifiedTime"),
093
094 @NamedQuery(name = "GET_ACTIONS_FOR_DATES", query = "select OBJECT(a) from CoordinatorActionBean a where a.jobId = :jobId AND (a.status = 'TIMEDOUT' OR a.status = 'SUCCEEDED' OR a.status = 'KILLED' OR a.status = 'FAILED') AND a.nominalTimestamp >= :startTime AND a.nominalTimestamp <= :endTime"),
095
096 @NamedQuery(name = "GET_ACTION_FOR_NOMINALTIME", query = "select OBJECT(a) from CoordinatorActionBean a where a.jobId = :jobId AND a.nominalTimestamp = :nominalTime"),
097
098 @NamedQuery(name = "GET_COORD_ACTIONS_COUNT", query = "select count(w) from CoordinatorActionBean w")})
099
100 @NamedNativeQueries({
101
102 @NamedNativeQuery(name = "GET_READY_ACTIONS_GROUP_BY_JOBID", query = "select a.job_id as job_id, MIN(a.last_modified_time) as min_lmt from COORD_ACTIONS a where a.status = 'READY' GROUP BY a.job_id HAVING MIN(a.last_modified_time) < ?", resultSetMapping = "CoordActionJobIdLmt")
103 })
104 public class CoordinatorActionBean extends JsonCoordinatorAction implements
105 Writable {
106 @Basic
107 @Index
108 @Column(name = "job_id")
109 private String jobId;
110
111 @Basic
112 @Index
113 @Column(name = "status")
114 private String status = null;
115
116 @Basic
117 @Column(name = "nominal_time")
118 private java.sql.Timestamp nominalTimestamp = null;
119
120 @Basic
121 @Index
122 @Column(name = "last_modified_time")
123 private java.sql.Timestamp lastModifiedTimestamp = null;
124
125 @Basic
126 @Index
127 @Column(name = "created_time")
128 private java.sql.Timestamp createdTimestamp = null;
129
130 @Basic
131 @Index
132 @Column(name = "rerun_time")
133 private java.sql.Timestamp rerunTimestamp = null;
134
135 @Basic
136 @Index
137 @Column(name = "external_id")
138 private String externalId;
139
140 @Column(name = "sla_xml")
141 @Lob
142 private String slaXml = null;
143
144 @Basic
145 @Column(name = "pending")
146 private int pending = 0;
147
148 public CoordinatorActionBean() {
149 }
150
151 /**
152 * Serialize the coordinator bean to a data output.
153 *
154 * @param dataOutput data output.
155 * @throws IOException thrown if the coordinator bean could not be serialized.
156 */
157 public void write(DataOutput dataOutput) throws IOException {
158 WritableUtils.writeStr(dataOutput, getJobId());
159 WritableUtils.writeStr(dataOutput, getType());
160 WritableUtils.writeStr(dataOutput, getId());
161 WritableUtils.writeStr(dataOutput, getCreatedConf());
162 WritableUtils.writeStr(dataOutput, getStatus().toString());
163 dataOutput.writeInt(getActionNumber());
164 WritableUtils.writeStr(dataOutput, getRunConf());
165 WritableUtils.writeStr(dataOutput, getExternalStatus());
166 WritableUtils.writeStr(dataOutput, getTrackerUri());
167 WritableUtils.writeStr(dataOutput, getConsoleUrl());
168 WritableUtils.writeStr(dataOutput, getErrorCode());
169 WritableUtils.writeStr(dataOutput, getErrorMessage());
170 dataOutput.writeLong((getCreatedTime() != null) ? getCreatedTime().getTime() : -1);
171 dataOutput.writeLong((getLastModifiedTime() != null) ? getLastModifiedTime().getTime() : -1);
172 }
173
174 /**
175 * Deserialize a coordinator bean from a data input.
176 *
177 * @param dataInput data input.
178 * @throws IOException thrown if the workflow bean could not be deserialized.
179 */
180 public void readFields(DataInput dataInput) throws IOException {
181 setJobId(WritableUtils.readStr(dataInput));
182 setType(WritableUtils.readStr(dataInput));
183 setId(WritableUtils.readStr(dataInput));
184 setCreatedConf(WritableUtils.readStr(dataInput));
185 setStatus(CoordinatorAction.Status.valueOf(WritableUtils.readStr(dataInput)));
186 setActionNumber(dataInput.readInt());
187 setRunConf(WritableUtils.readStr(dataInput));
188 setExternalStatus(WritableUtils.readStr(dataInput));
189 setTrackerUri(WritableUtils.readStr(dataInput));
190 setConsoleUrl(WritableUtils.readStr(dataInput));
191 setErrorCode(WritableUtils.readStr(dataInput));
192 setErrorMessage(WritableUtils.readStr(dataInput));
193 long d = dataInput.readLong();
194 if (d != -1) {
195 setCreatedTime(new Date(d));
196 }
197 d = dataInput.readLong();
198 if (d != -1) {
199 setLastModifiedTime(new Date(d));
200 }
201 }
202
203 @Override
204 public String getJobId() {
205 return this.jobId;
206 }
207
208 @Override
209 public void setJobId(String id) {
210 super.setJobId(id);
211 this.jobId = id;
212 }
213
214 @Override
215 public Status getStatus() {
216 return Status.valueOf(status);
217 }
218
219 @Override
220 public void setStatus(Status status) {
221 super.setStatus(status);
222 this.status = status.toString();
223 }
224
225 @Override
226 public void setCreatedTime(Date createdTime) {
227 this.createdTimestamp = DateUtils.convertDateToTimestamp(createdTime);
228 super.setCreatedTime(createdTime);
229 }
230
231 public void setRerunTime(Date rerunTime) {
232 this.rerunTimestamp = DateUtils.convertDateToTimestamp(rerunTime);
233 }
234
235 @Override
236 public void setNominalTime(Date nominalTime) {
237 this.nominalTimestamp = DateUtils.convertDateToTimestamp(nominalTime);
238 super.setNominalTime(nominalTime);
239 }
240
241 @Override
242 public void setLastModifiedTime(Date lastModifiedTime) {
243 this.lastModifiedTimestamp = DateUtils.convertDateToTimestamp(lastModifiedTime);
244 super.setLastModifiedTime(lastModifiedTime);
245 }
246
247 @Override
248 public Date getCreatedTime() {
249 return DateUtils.toDate(createdTimestamp);
250 }
251
252 public Timestamp getCreatedTimestamp() {
253 return createdTimestamp;
254 }
255
256 public Date getRerunTime() {
257 return DateUtils.toDate(rerunTimestamp);
258 }
259
260 public Timestamp getRerunTimestamp() {
261 return rerunTimestamp;
262 }
263
264 @Override
265 public Date getLastModifiedTime() {
266 return DateUtils.toDate(lastModifiedTimestamp);
267 }
268
269 public Timestamp getLastModifiedTimestamp() {
270 return lastModifiedTimestamp;
271 }
272
273 @Override
274 public Date getNominalTime() {
275 return DateUtils.toDate(nominalTimestamp);
276 }
277
278 public Timestamp getNominalTimestamp() {
279 return nominalTimestamp;
280 }
281
282 @Override
283 public String getExternalId() {
284 return externalId;
285 }
286
287 @Override
288 public void setExternalId(String externalId) {
289 super.setExternalId(externalId);
290 this.externalId = externalId;
291 }
292
293 public String getSlaXml() {
294 return slaXml;
295 }
296
297 public void setSlaXml(String slaXml) {
298 this.slaXml = slaXml;
299 }
300
301 /**
302 * @return true if in terminal status
303 */
304 public boolean isTerminalStatus() {
305 boolean isTerminal = true;
306 switch (getStatus()) {
307 case WAITING:
308 case READY:
309 case SUBMITTED:
310 case RUNNING:
311 case SUSPENDED:
312 isTerminal = false;
313 break;
314 default:
315 isTerminal = true;
316 break;
317 }
318 return isTerminal;
319 }
320
321 /**
322 * Set some actions are in progress for particular coordinator action.
323 *
324 * @param pending set pending to true
325 */
326 public void setPending(int pending) {
327 this.pending = pending;
328 }
329
330 /**
331 * increment pending and return it
332 *
333 * @return pending
334 */
335 public int incrementAndGetPending() {
336 this.pending++;
337 return pending;
338 }
339
340 /**
341 * decrement pending and return it
342 *
343 * @return pending
344 */
345 public int decrementAndGetPending() {
346 this.pending = Math.max(this.pending-1, 0);
347 return pending;
348 }
349
350 /**
351 * Get some actions are in progress for particular bundle action.
352 *
353 * @return pending
354 */
355 public int getPending() {
356 return this.pending;
357 }
358
359 /**
360 * Return if the action is pending.
361 *
362 * @return if the action is pending.
363 */
364 public boolean isPending() {
365 return pending > 0 ? true : false;
366 }
367 }