001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie; 019 020 import java.io.DataInput; 021 import java.io.DataOutput; 022 import java.io.IOException; 023 import java.sql.Timestamp; 024 import java.util.Date; 025 026 import javax.persistence.Basic; 027 import javax.persistence.Column; 028 import javax.persistence.ColumnResult; 029 import javax.persistence.Entity; 030 import javax.persistence.Lob; 031 import javax.persistence.NamedNativeQueries; 032 import javax.persistence.NamedNativeQuery; 033 import javax.persistence.NamedQueries; 034 import javax.persistence.NamedQuery; 035 import javax.persistence.SqlResultSetMapping; 036 037 import org.apache.hadoop.io.Writable; 038 import org.apache.oozie.client.CoordinatorAction; 039 import org.apache.oozie.client.rest.JsonCoordinatorAction; 040 import org.apache.oozie.util.DateUtils; 041 import org.apache.oozie.util.WritableUtils; 042 import org.apache.openjpa.persistence.jdbc.Index; 043 044 @SqlResultSetMapping( 045 name = "CoordActionJobIdLmt", 046 columns = {@ColumnResult(name = "job_id"), 047 @ColumnResult(name = "min_lmt")}) 048 049 @Entity 050 @NamedQueries({ 051 052 @NamedQuery(name = "UPDATE_COORD_ACTION", query = "update CoordinatorActionBean w set w.actionNumber = :actionNumber, w.actionXml = :actionXml, w.consoleUrl = :consoleUrl, w.createdConf = :createdConf, w.errorCode = :errorCode, w.errorMessage = :errorMessage, w.externalStatus = :externalStatus, w.missingDependencies = :missingDependencies, w.runConf = :runConf, w.timeOut = :timeOut, w.trackerUri = :trackerUri, w.type = :type, w.createdTimestamp = :createdTime, w.externalId = :externalId, w.jobId = :jobId, w.lastModifiedTimestamp = :lastModifiedTime, w.nominalTimestamp = :nominalTime, w.slaXml = :slaXml, w.status = :status where w.id = :id"), 053 054 @NamedQuery(name = "UPDATE_COORD_ACTION_MIN", query = "update CoordinatorActionBean w set w.actionXml = :actionXml, w.missingDependencies = :missingDependencies, w.lastModifiedTimestamp = :lastModifiedTime, w.status = :status where w.id = :id"), 055 // Query to update the action status, pending status and last modified time stamp of a Coordinator action 056 @NamedQuery(name = "UPDATE_COORD_ACTION_STATUS_PENDING_TIME", query = "update CoordinatorActionBean w set w.status =:status, w.pending =:pending, w.lastModifiedTimestamp = :lastModifiedTime where w.id = :id"), 057 // Update query for InputCheck 058 @NamedQuery(name = "UPDATE_COORD_ACTION_FOR_INPUTCHECK", query = "update CoordinatorActionBean w set w.status = :status, w.lastModifiedTimestamp = :lastModifiedTime, w.actionXml = :actionXml, w.missingDependencies = :missingDependencies where w.id = :id"), 059 // Update query for Push-based missing dependency check 060 @NamedQuery(name = "UPDATE_COORD_ACTION_FOR_PUSH_INPUTCHECK", query = "update CoordinatorActionBean w set w.status = :status, w.lastModifiedTimestamp = :lastModifiedTime, w.actionXml = :actionXml, w.pushMissingDependencies = :pushMissingDependencies where w.id = :id"), 061 // Update query for Start 062 @NamedQuery(name = "UPDATE_COORD_ACTION_FOR_START", query = "update CoordinatorActionBean w set w.status =:status, w.lastModifiedTimestamp = :lastModifiedTime, w.runConf = :runConf, w.externalId = :externalId, w.pending = :pending, w.errorCode = :errorCode, w.errorMessage = :errorMessage where w.id = :id"), 063 064 @NamedQuery(name = "UPDATE_COORD_ACTION_FOR_MODIFIED_DATE", query = "update CoordinatorActionBean w set w.lastModifiedTimestamp = :lastModifiedTime where w.id = :id"), 065 066 @NamedQuery(name = "DELETE_COMPLETED_ACTIONS_FOR_COORDINATOR", query = "delete from CoordinatorActionBean a where a.jobId = :jobId and (a.status = 'SUCCEEDED' OR a.status = 'FAILED' OR a.status= 'KILLED')"), 067 068 @NamedQuery(name = "DELETE_ACTIONS_FOR_COORDINATOR", query = "delete from CoordinatorActionBean a where a.jobId = :jobId"), 069 070 @NamedQuery(name = "DELETE_UNSCHEDULED_ACTION", query = "delete from CoordinatorActionBean a where a.id = :id and (a.status = 'WAITING' OR a.status = 'READY')"), 071 072 // Query used by XTestcase to setup tables 073 @NamedQuery(name = "GET_COORD_ACTIONS", query = "select OBJECT(w) from CoordinatorActionBean w"), 074 // Select query used only by test cases 075 @NamedQuery(name = "GET_COORD_ACTION", query = "select OBJECT(a) from CoordinatorActionBean a where a.id = :id"), 076 077 // Select query used by SLAService on restart 078 @NamedQuery(name = "GET_COORD_ACTION_FOR_SLA", query = "select a.id, a.jobId, a.status, a.externalId, a.lastModifiedTimestamp from CoordinatorActionBean a where a.id = :id"), 079 // Select query used by ActionInfo command 080 @NamedQuery(name = "GET_COORD_ACTION_FOR_INFO", query = "select a.id, a.jobId, a.actionNumber, a.consoleUrl, a.errorCode, a.errorMessage, a.externalId, a.externalStatus, a.trackerUri, a.createdTimestamp, a.nominalTimestamp, a.status, a.lastModifiedTimestamp, a.missingDependencies, a.pushMissingDependencies from CoordinatorActionBean a where a.id = :id"), 081 // Select Query used by Timeout command 082 @NamedQuery(name = "GET_COORD_ACTION_FOR_TIMEOUT", query = "select a.id, a.jobId, a.status, a.runConf, a.pending, a.nominalTimestamp, a.createdTimestamp from CoordinatorActionBean a where a.id = :id"), 083 // Select query used by InputCheck command 084 @NamedQuery(name = "GET_COORD_ACTION_FOR_INPUTCHECK", query = "select a.id, a.jobId, a.status, a.runConf, a.nominalTimestamp, a.createdTimestamp, a.actionXml, a.missingDependencies, a.pushMissingDependencies, a.timeOut from CoordinatorActionBean a where a.id = :id"), 085 // Select query used by CoordActionUpdate command 086 @NamedQuery(name = "GET_COORD_ACTION_FOR_EXTERNALID", query = "select a.id, a.jobId, a.status, a.pending, a.externalId, a.lastModifiedTimestamp, a.slaXml, a.nominalTimestamp, a.createdTimestamp from CoordinatorActionBean a where a.externalId = :externalId"), 087 // Select query used by Check command 088 @NamedQuery(name = "GET_COORD_ACTION_FOR_CHECK", query = "select a.id, a.jobId, a.status, a.pending, a.externalId, a.lastModifiedTimestamp, a.slaXml, a.nominalTimestamp, a.createdTimestamp from CoordinatorActionBean a where a.id = :id"), 089 // Select query used by Start command 090 @NamedQuery(name = "GET_COORD_ACTION_FOR_START", query = "select a.id, a.jobId, a.status, a.pending, a.createdConf, a.slaXml, a.actionXml, a.externalId, a.errorMessage, a.errorCode, a.nominalTimestamp, a.createdTimestamp from CoordinatorActionBean a where a.id = :id"), 091 092 @NamedQuery(name = "GET_COORD_ACTIONS_FOR_JOB_FIFO", query = "select a.id, a.jobId, a.status, a.pending, a.nominalTimestamp, a.createdTimestamp from CoordinatorActionBean a where a.jobId = :jobId AND a.status = 'READY' order by a.nominalTimestamp"), 093 094 @NamedQuery(name = "GET_COORD_ACTIONS_FOR_JOB_LIFO", query = "select a.id, a.jobId, a.status, a.pending, a.nominalTimestamp, a.createdTimestamp from CoordinatorActionBean a where a.jobId = :jobId AND a.status = 'READY' order by a.nominalTimestamp desc"), 095 096 @NamedQuery(name = "GET_COORD_RUNNING_ACTIONS_COUNT", query = "select count(a) from CoordinatorActionBean a where a.jobId = :jobId AND (a.status = 'RUNNING' OR a.status='SUBMITTED')"), 097 098 @NamedQuery(name = "GET_COORD_ACTIONS_COUNT_BY_JOBID", query = "select count(a) from CoordinatorActionBean a where a.jobId = :jobId"), 099 100 @NamedQuery(name = "GET_COORD_ACTIVE_ACTIONS_COUNT_BY_JOBID", query = "select count(a) from CoordinatorActionBean a where a.jobId = :jobId AND a.status = 'WAITING'"), 101 102 @NamedQuery(name = "GET_COORD_ACTIONS_PENDING_FALSE_COUNT", query = "select count(a) from CoordinatorActionBean a where a.jobId = :jobId AND a.pending = 0 AND (a.status = 'SUSPENDED' OR a.status = 'TIMEDOUT' OR a.status = 'SUCCEEDED' OR a.status = 'KILLED' OR a.status = 'FAILED')"), 103 104 @NamedQuery(name = "GET_COORD_ACTIONS_PENDING_FALSE_STATUS_COUNT", query = "select count(a) from CoordinatorActionBean a where a.jobId = :jobId AND a.pending = 0 AND a.status = :status"), 105 106 @NamedQuery(name = "GET_ACTIONS_FOR_COORD_JOB", query = "select count(a) from CoordinatorActionBean a where a.jobId = :jobId"), 107 // Query to retrieve Coordinator actions sorted by nominal time 108 @NamedQuery(name = "GET_ACTIONS_FOR_COORD_JOB_ORDER_BY_NOMINAL_TIME", query = "select a.id, a.actionNumber, a.consoleUrl, a.errorCode, a.errorMessage, a.externalId, a.externalStatus, a.jobId, a.trackerUri, a.createdTimestamp, a.nominalTimestamp, a.status, a.lastModifiedTimestamp, a.missingDependencies, a.pushMissingDependencies, a.timeOut from CoordinatorActionBean a where a.jobId = :jobId order by a.nominalTimestamp"), 109 // Query to maintain backward compatibility for coord job info command 110 @NamedQuery(name = "GET_ALL_COLS_FOR_ACTIONS_FOR_COORD_JOB_ORDER_BY_NOMINAL_TIME", query = "select OBJECT(a) from CoordinatorActionBean a where a.jobId = :jobId order by a.nominalTimestamp"), 111 // Query to retrieve action id, action status, pending status and external Id of not completed Coordinator actions 112 @NamedQuery(name = "GET_COORD_ACTIONS_NOT_COMPLETED", query = "select a.id, a.status, a.pending, a.externalId, a.pushMissingDependencies, a.nominalTimestamp, a.createdTimestamp, a.jobId from CoordinatorActionBean a where a.jobId = :jobId AND a.status <> 'FAILED' AND a.status <> 'TIMEDOUT' AND a.status <> 'SUCCEEDED' AND a.status <> 'KILLED'"), 113 114 // Query to retrieve action id, action status, pending status and external Id of running Coordinator actions 115 @NamedQuery(name = "GET_COORD_ACTIONS_RUNNING", query = "select a.id, a.status, a.pending, a.externalId, a.nominalTimestamp, a.createdTimestamp from CoordinatorActionBean a where a.jobId = :jobId and a.status = 'RUNNING'"), 116 117 // Query to retrieve action id, action status, pending status and external Id of suspended Coordinator actions 118 @NamedQuery(name = "GET_COORD_ACTIONS_SUSPENDED", query = "select a.id, a.status, a.pending, a.externalId, a.nominalTimestamp, a.createdTimestamp from CoordinatorActionBean a where a.jobId = :jobId and a.status = 'SUSPENDED'"), 119 120 // Query to retrieve count of Coordinator actions which are pending 121 @NamedQuery(name = "GET_COORD_ACTIONS_PENDING_COUNT", query = "select count(a) from CoordinatorActionBean a where a.jobId = :jobId AND a.pending > 0"), 122 123 // Query to retrieve status of Coordinator actions 124 @NamedQuery(name = "GET_COORD_ACTIONS_STATUS", query = "select a.status from CoordinatorActionBean a where a.jobId = :jobId"), 125 126 @NamedQuery(name = "GET_COORD_ACTION_FOR_COORD_JOB_BY_ACTION_NUMBER", query = "select a.id from CoordinatorActionBean a where a.jobId = :jobId AND a.actionNumber = :actionNumber"), 127 128 @NamedQuery(name = "GET_COORD_ACTIONS_BY_LAST_MODIFIED_TIME", query = "select a.jobId from CoordinatorActionBean a where a.lastModifiedTimestamp >= :lastModifiedTime"), 129 130 //Used by coordinator store only 131 @NamedQuery(name = "GET_RUNNING_ACTIONS_FOR_COORD_JOB", query = "select OBJECT(a) from CoordinatorActionBean a where a.jobId = :jobId AND a.status = 'RUNNING'"), 132 133 @NamedQuery(name = "GET_RUNNING_ACTIONS_OLDER_THAN", query = "select a.id from CoordinatorActionBean a where a.status = 'RUNNING' AND a.lastModifiedTimestamp <= :lastModifiedTime"), 134 135 @NamedQuery(name = "GET_COORD_ACTIONS_WAITING_SUBMITTED_OLDER_THAN", query = "select a.id, a.jobId, a.status, a.externalId, a.pushMissingDependencies from CoordinatorActionBean a where (a.status = 'WAITING' OR a.status = 'SUBMITTED') AND a.lastModifiedTimestamp <= :lastModifiedTime"), 136 137 @NamedQuery(name = "GET_COORD_ACTIONS_FOR_RECOVERY_OLDER_THAN", query = "select a.id, a.jobId, a.status, a.externalId from CoordinatorActionBean a where a.pending > 0 AND (a.status = 'SUSPENDED' OR a.status = 'KILLED' OR a.status = 'RUNNING') AND a.lastModifiedTimestamp <= :lastModifiedTime"), 138 // Select query used by rerun, requires almost all columns so select * is used 139 @NamedQuery(name = "GET_ACTIONS_FOR_DATES", query = "select OBJECT(a) from CoordinatorActionBean a where a.jobId = :jobId AND (a.status = 'TIMEDOUT' OR a.status = 'SUCCEEDED' OR a.status = 'KILLED' OR a.status = 'FAILED') AND a.nominalTimestamp >= :startTime AND a.nominalTimestamp <= :endTime"), 140 // Select query used by log 141 @NamedQuery(name = "GET_ACTION_IDS_FOR_DATES", query = "select a.id from CoordinatorActionBean a where a.jobId = :jobId AND (a.status = 'TIMEDOUT' OR a.status = 'SUCCEEDED' OR a.status = 'KILLED' OR a.status = 'FAILED') AND a.nominalTimestamp >= :startTime AND a.nominalTimestamp <= :endTime"), 142 // Select query used by rerun, requires almost all columns so select * is used 143 @NamedQuery(name = "GET_ACTION_FOR_NOMINALTIME", query = "select OBJECT(a) from CoordinatorActionBean a where a.jobId = :jobId AND a.nominalTimestamp = :nominalTime"), 144 145 @NamedQuery(name = "GET_COORD_ACTIONS_COUNT", query = "select count(w) from CoordinatorActionBean w")}) 146 147 @NamedNativeQueries({ 148 149 @NamedNativeQuery(name = "GET_READY_ACTIONS_GROUP_BY_JOBID", query = "select a.job_id as job_id, MIN(a.last_modified_time) as min_lmt from COORD_ACTIONS a where a.status = 'READY' GROUP BY a.job_id HAVING MIN(a.last_modified_time) < ?", resultSetMapping = "CoordActionJobIdLmt") 150 }) 151 public class CoordinatorActionBean extends JsonCoordinatorAction implements 152 Writable { 153 @Basic 154 @Index 155 @Column(name = "job_id") 156 private String jobId; 157 158 @Basic 159 @Index 160 @Column(name = "status") 161 private String status = null; 162 163 @Basic 164 @Index 165 @Column(name = "nominal_time") 166 private java.sql.Timestamp nominalTimestamp = null; 167 168 @Basic 169 @Index 170 @Column(name = "last_modified_time") 171 private java.sql.Timestamp lastModifiedTimestamp = null; 172 173 @Basic 174 @Index 175 @Column(name = "created_time") 176 private java.sql.Timestamp createdTimestamp = null; 177 178 @Basic 179 @Index 180 @Column(name = "rerun_time") 181 private java.sql.Timestamp rerunTimestamp = null; 182 183 @Basic 184 @Index 185 @Column(name = "external_id") 186 private String externalId; 187 188 @Column(name = "sla_xml") 189 @Lob 190 private String slaXml = null; 191 192 @Basic 193 @Column(name = "pending") 194 private int pending = 0; 195 196 public CoordinatorActionBean() { 197 } 198 199 /** 200 * Serialize the coordinator bean to a data output. 201 * 202 * @param dataOutput data output. 203 * @throws IOException thrown if the coordinator bean could not be 204 * serialized. 205 */ 206 @Override 207 public void write(DataOutput dataOutput) throws IOException { 208 WritableUtils.writeStr(dataOutput, getJobId()); 209 WritableUtils.writeStr(dataOutput, getType()); 210 WritableUtils.writeStr(dataOutput, getId()); 211 WritableUtils.writeStr(dataOutput, getCreatedConf()); 212 WritableUtils.writeStr(dataOutput, getStatus().toString()); 213 dataOutput.writeInt(getActionNumber()); 214 WritableUtils.writeStr(dataOutput, getRunConf()); 215 WritableUtils.writeStr(dataOutput, getExternalStatus()); 216 WritableUtils.writeStr(dataOutput, getTrackerUri()); 217 WritableUtils.writeStr(dataOutput, getConsoleUrl()); 218 WritableUtils.writeStr(dataOutput, getErrorCode()); 219 WritableUtils.writeStr(dataOutput, getErrorMessage()); 220 dataOutput.writeLong((getCreatedTime() != null) ? getCreatedTime().getTime() : -1); 221 dataOutput.writeLong((getLastModifiedTime() != null) ? getLastModifiedTime().getTime() : -1); 222 } 223 224 /** 225 * Deserialize a coordinator bean from a data input. 226 * 227 * @param dataInput data input. 228 * @throws IOException thrown if the workflow bean could not be 229 * deserialized. 230 */ 231 @Override 232 public void readFields(DataInput dataInput) throws IOException { 233 setJobId(WritableUtils.readStr(dataInput)); 234 setType(WritableUtils.readStr(dataInput)); 235 setId(WritableUtils.readStr(dataInput)); 236 setCreatedConf(WritableUtils.readStr(dataInput)); 237 setStatus(CoordinatorAction.Status.valueOf(WritableUtils.readStr(dataInput))); 238 setActionNumber(dataInput.readInt()); 239 setRunConf(WritableUtils.readStr(dataInput)); 240 setExternalStatus(WritableUtils.readStr(dataInput)); 241 setTrackerUri(WritableUtils.readStr(dataInput)); 242 setConsoleUrl(WritableUtils.readStr(dataInput)); 243 setErrorCode(WritableUtils.readStr(dataInput)); 244 setErrorMessage(WritableUtils.readStr(dataInput)); 245 long d = dataInput.readLong(); 246 if (d != -1) { 247 setCreatedTime(new Date(d)); 248 } 249 d = dataInput.readLong(); 250 if (d != -1) { 251 setLastModifiedTime(new Date(d)); 252 } 253 } 254 255 @Override 256 public String getJobId() { 257 return this.jobId; 258 } 259 260 @Override 261 public void setJobId(String id) { 262 super.setJobId(id); 263 this.jobId = id; 264 } 265 266 @Override 267 public Status getStatus() { 268 return Status.valueOf(status); 269 } 270 271 /** 272 * Return the status in string 273 * @return 274 */ 275 public String getStatusStr() { 276 return status; 277 } 278 279 @Override 280 public void setStatus(Status status) { 281 super.setStatus(status); 282 this.status = status.toString(); 283 } 284 285 @Override 286 public void setCreatedTime(Date createdTime) { 287 this.createdTimestamp = DateUtils.convertDateToTimestamp(createdTime); 288 super.setCreatedTime(createdTime); 289 } 290 291 public void setRerunTime(Date rerunTime) { 292 this.rerunTimestamp = DateUtils.convertDateToTimestamp(rerunTime); 293 } 294 295 @Override 296 public void setNominalTime(Date nominalTime) { 297 this.nominalTimestamp = DateUtils.convertDateToTimestamp(nominalTime); 298 super.setNominalTime(nominalTime); 299 } 300 301 @Override 302 public void setLastModifiedTime(Date lastModifiedTime) { 303 this.lastModifiedTimestamp = DateUtils.convertDateToTimestamp(lastModifiedTime); 304 super.setLastModifiedTime(lastModifiedTime); 305 } 306 307 @Override 308 public Date getCreatedTime() { 309 return DateUtils.toDate(createdTimestamp); 310 } 311 312 public Timestamp getCreatedTimestamp() { 313 return createdTimestamp; 314 } 315 316 public Date getRerunTime() { 317 return DateUtils.toDate(rerunTimestamp); 318 } 319 320 public Timestamp getRerunTimestamp() { 321 return rerunTimestamp; 322 } 323 324 @Override 325 public Date getLastModifiedTime() { 326 return DateUtils.toDate(lastModifiedTimestamp); 327 } 328 329 public Timestamp getLastModifiedTimestamp() { 330 return lastModifiedTimestamp; 331 } 332 333 @Override 334 public Date getNominalTime() { 335 return DateUtils.toDate(nominalTimestamp); 336 } 337 338 public Timestamp getNominalTimestamp() { 339 return nominalTimestamp; 340 } 341 342 @Override 343 public String getExternalId() { 344 return externalId; 345 } 346 347 @Override 348 public void setExternalId(String externalId) { 349 super.setExternalId(externalId); 350 this.externalId = externalId; 351 } 352 353 public String getSlaXml() { 354 return slaXml; 355 } 356 357 public void setSlaXml(String slaXml) { 358 this.slaXml = slaXml; 359 } 360 361 /** 362 * @return true if in terminal status 363 */ 364 public boolean isTerminalStatus() { 365 boolean isTerminal = true; 366 switch (getStatus()) { 367 case WAITING: 368 case READY: 369 case SUBMITTED: 370 case RUNNING: 371 case SUSPENDED: 372 isTerminal = false; 373 break; 374 default: 375 isTerminal = true; 376 break; 377 } 378 return isTerminal; 379 } 380 381 /** 382 * Return if the action is complete with failure. 383 * 384 * @return if the action is complete with failure. 385 */ 386 public boolean isTerminalWithFailure() { 387 boolean result = false; 388 switch (getStatus()) { 389 case FAILED: 390 case KILLED: 391 case TIMEDOUT: 392 result = true; 393 } 394 return result; 395 } 396 397 /** 398 * Set some actions are in progress for particular coordinator action. 399 * 400 * @param pending set pending to true 401 */ 402 public void setPending(int pending) { 403 this.pending = pending; 404 } 405 406 /** 407 * increment pending and return it 408 * 409 * @return pending 410 */ 411 public int incrementAndGetPending() { 412 this.pending++; 413 return pending; 414 } 415 416 /** 417 * decrement pending and return it 418 * 419 * @return pending 420 */ 421 public int decrementAndGetPending() { 422 this.pending = Math.max(this.pending - 1, 0); 423 return pending; 424 } 425 426 /** 427 * Get some actions are in progress for particular bundle action. 428 * 429 * @return pending 430 */ 431 public int getPending() { 432 return this.pending; 433 } 434 435 /** 436 * Return if the action is pending. 437 * 438 * @return if the action is pending. 439 */ 440 public boolean isPending() { 441 return pending > 0 ? true : false; 442 } 443 }