001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie; 019 020 import java.io.DataInput; 021 import java.io.DataOutput; 022 import java.io.IOException; 023 import java.sql.Timestamp; 024 import java.util.Date; 025 026 import javax.persistence.Basic; 027 import javax.persistence.Column; 028 import javax.persistence.ColumnResult; 029 import javax.persistence.Entity; 030 import javax.persistence.Lob; 031 import javax.persistence.NamedNativeQueries; 032 import javax.persistence.NamedNativeQuery; 033 import javax.persistence.NamedQueries; 034 import javax.persistence.NamedQuery; 035 import javax.persistence.SqlResultSetMapping; 036 037 import org.apache.hadoop.io.Writable; 038 import org.apache.oozie.client.CoordinatorAction; 039 import org.apache.oozie.client.rest.JsonCoordinatorAction; 040 import org.apache.oozie.util.DateUtils; 041 import org.apache.oozie.util.WritableUtils; 042 import org.apache.openjpa.persistence.jdbc.Index; 043 044 @SqlResultSetMapping( 045 name = "CoordActionJobIdLmt", 046 columns = {@ColumnResult(name = "job_id"), 047 @ColumnResult(name = "min_lmt")}) 048 049 @Entity 050 @NamedQueries({ 051 052 @NamedQuery(name = "UPDATE_COORD_ACTION", query = "update CoordinatorActionBean w set w.actionNumber = :actionNumber, w.actionXml = :actionXml, w.consoleUrl = :consoleUrl, w.createdConf = :createdConf, w.errorCode = :errorCode, w.errorMessage = :errorMessage, w.externalStatus = :externalStatus, w.missingDependencies = :missingDependencies, w.runConf = :runConf, w.timeOut = :timeOut, w.trackerUri = :trackerUri, w.type = :type, w.createdTimestamp = :createdTime, w.externalId = :externalId, w.jobId = :jobId, w.lastModifiedTimestamp = :lastModifiedTime, w.nominalTimestamp = :nominalTime, w.slaXml = :slaXml, w.status = :status where w.id = :id"), 053 054 @NamedQuery(name = "UPDATE_COORD_ACTION_MIN", query = "update CoordinatorActionBean w set w.actionXml = :actionXml, w.missingDependencies = :missingDependencies, w.lastModifiedTimestamp = :lastModifiedTime, w.status = :status where w.id = :id"), 055 // Query to update the action status, pending status and last modified time stamp of a Coordinator action 056 @NamedQuery(name = "UPDATE_COORD_ACTION_STATUS_PENDING_TIME", query = "update CoordinatorActionBean w set w.status =:status, w.pending =:pending, w.lastModifiedTimestamp = :lastModifiedTime where w.id = :id"), 057 // Update query for InputCheck 058 @NamedQuery(name = "UPDATE_COORD_ACTION_FOR_INPUTCHECK", query = "update CoordinatorActionBean w set w.status = :status, w.lastModifiedTimestamp = :lastModifiedTime, w.actionXml = :actionXml, w.missingDependencies = :missingDependencies where w.id = :id"), 059 // Update query for Start 060 @NamedQuery(name = "UPDATE_COORD_ACTION_FOR_START", query = "update CoordinatorActionBean w set w.status =:status, w.lastModifiedTimestamp = :lastModifiedTime, w.runConf = :runConf, w.externalId = :externalId, w.pending = :pending where w.id = :id"), 061 062 @NamedQuery(name = "DELETE_COMPLETED_ACTIONS_FOR_COORDINATOR", query = "delete from CoordinatorActionBean a where a.jobId = :jobId and (a.status = 'SUCCEEDED' OR a.status = 'FAILED' OR a.status= 'KILLED')"), 063 064 // Query used by XTestcase to setup tables 065 @NamedQuery(name = "GET_COORD_ACTIONS", query = "select OBJECT(w) from CoordinatorActionBean w"), 066 // Select query used only by test cases 067 @NamedQuery(name = "GET_COORD_ACTION", query = "select OBJECT(a) from CoordinatorActionBean a where a.id = :id"), 068 069 // Select query used by ActionInfo command 070 @NamedQuery(name = "GET_COORD_ACTION_FOR_INFO", query = "select a.id, a.jobId, a.actionNumber, a.consoleUrl, a.errorCode, a.errorMessage, a.externalId, a.externalStatus, a.trackerUri, a.createdTimestamp, a.nominalTimestamp, a.status, a.lastModifiedTimestamp, a.missingDependencies from CoordinatorActionBean a where a.id = :id"), 071 // Select Query used by Timeout command 072 @NamedQuery(name = "GET_COORD_ACTION_FOR_TIMEOUT", query = "select a.id, a.jobId, a.status, a.runConf, a.pending from CoordinatorActionBean a where a.id = :id"), 073 // Select query used by InputCheck command 074 @NamedQuery(name = "GET_COORD_ACTION_FOR_INPUTCHECK", query = "select a.id, a.jobId, a.status, a.runConf, a.nominalTimestamp, a.createdTimestamp, a.actionXml, a.missingDependencies, a.timeOut from CoordinatorActionBean a where a.id = :id"), 075 // Select query used by CoordActionUpdate command 076 @NamedQuery(name = "GET_COORD_ACTION_FOR_EXTERNALID", query = "select a.id, a.jobId, a.status, a.pending, a.externalId, a.lastModifiedTimestamp, a.slaXml from CoordinatorActionBean a where a.externalId = :externalId"), 077 // Select query used by Check command 078 @NamedQuery(name = "GET_COORD_ACTION_FOR_CHECK", query = "select a.id, a.jobId, a.status, a.pending, a.externalId, a.lastModifiedTimestamp, a.slaXml from CoordinatorActionBean a where a.id = :id"), 079 // Select query used by Start command 080 @NamedQuery(name = "GET_COORD_ACTION_FOR_START", query = "select a.id, a.jobId, a.status, a.pending, a.createdConf, a.slaXml, a.actionXml, a.externalId, a.errorMessage, a.errorCode from CoordinatorActionBean a where a.id = :id"), 081 082 @NamedQuery(name = "GET_COORD_ACTIONS_FOR_JOB_FIFO", query = "select a.id, a.jobId, a.status, a.pending from CoordinatorActionBean a where a.jobId = :jobId AND a.status = 'READY' order by a.nominalTimestamp"), 083 084 @NamedQuery(name = "GET_COORD_ACTIONS_FOR_JOB_LIFO", query = "select a.id, a.jobId, a.status, a.pending from CoordinatorActionBean a where a.jobId = :jobId AND a.status = 'READY' order by a.nominalTimestamp desc"), 085 086 @NamedQuery(name = "GET_COORD_RUNNING_ACTIONS_COUNT", query = "select count(a) from CoordinatorActionBean a where a.jobId = :jobId AND (a.status = 'RUNNING' OR a.status='SUBMITTED')"), 087 088 @NamedQuery(name = "GET_COORD_ACTIONS_COUNT_BY_JOBID", query = "select count(a) from CoordinatorActionBean a where a.jobId = :jobId"), 089 090 @NamedQuery(name = "GET_COORD_ACTIVE_ACTIONS_COUNT_BY_JOBID", query = "select count(a) from CoordinatorActionBean a where a.jobId = :jobId AND a.status = 'WAITING'"), 091 092 @NamedQuery(name = "GET_COORD_ACTIONS_PENDING_FALSE_COUNT", query = "select count(a) from CoordinatorActionBean a where a.jobId = :jobId AND a.pending = 0 AND (a.status = 'SUSPENDED' OR a.status = 'TIMEDOUT' OR a.status = 'SUCCEEDED' OR a.status = 'KILLED' OR a.status = 'FAILED')"), 093 094 @NamedQuery(name = "GET_COORD_ACTIONS_PENDING_FALSE_STATUS_COUNT", query = "select count(a) from CoordinatorActionBean a where a.jobId = :jobId AND a.pending = 0 AND a.status = :status"), 095 096 @NamedQuery(name = "GET_ACTIONS_FOR_COORD_JOB", query = "select count(a) from CoordinatorActionBean a where a.jobId = :jobId"), 097 // Query to retrieve Coordinator actions sorted by nominal time 098 @NamedQuery(name = "GET_ACTIONS_FOR_COORD_JOB_ORDER_BY_NOMINAL_TIME", query = "select a.id, a.actionNumber, a.consoleUrl, a.errorCode, a.errorMessage, a.externalId, a.externalStatus, a.jobId, a.trackerUri, a.createdTimestamp, a.nominalTimestamp, a.status, a.lastModifiedTimestamp, a.missingDependencies, a.timeOut from CoordinatorActionBean a where a.jobId = :jobId order by a.nominalTimestamp"), 099 // Query to maintain backward compatibility for coord job info command 100 @NamedQuery(name = "GET_ALL_COLS_FOR_ACTIONS_FOR_COORD_JOB_ORDER_BY_NOMINAL_TIME", query = "select OBJECT(a) from CoordinatorActionBean a where a.jobId = :jobId order by a.nominalTimestamp"), 101 // Query to retrieve action id, action status, pending status and external Id of not completed Coordinator actions 102 @NamedQuery(name = "GET_COORD_ACTIONS_NOT_COMPLETED", query = "select a.id, a.status, a.pending, a.externalId from CoordinatorActionBean a where a.jobId = :jobId AND a.status <> 'FAILED' AND a.status <> 'TIMEDOUT' AND a.status <> 'SUCCEEDED' AND a.status <> 'KILLED'"), 103 104 // Query to retrieve action id, action status, pending status and external Id of running Coordinator actions 105 @NamedQuery(name = "GET_COORD_ACTIONS_RUNNING", query = "select a.id, a.status, a.pending, a.externalId from CoordinatorActionBean a where a.jobId = :jobId and a.status = 'RUNNING'"), 106 107 // Query to retrieve action id, action status, pending status and external Id of suspended Coordinator actions 108 @NamedQuery(name = "GET_COORD_ACTIONS_SUSPENDED", query = "select a.id, a.status, a.pending, a.externalId from CoordinatorActionBean a where a.jobId = :jobId and a.status = 'SUSPENDED'"), 109 110 // Query to retrieve count of Coordinator actions which are pending 111 @NamedQuery(name = "GET_COORD_ACTIONS_PENDING_COUNT", query = "select count(a) from CoordinatorActionBean a where a.jobId = :jobId AND a.pending > 0"), 112 113 // Query to retrieve status of Coordinator actions which are not pending 114 @NamedQuery(name = "GET_COORD_ACTIONS_STATUS_BY_PENDING_FALSE", query = "select a.status from CoordinatorActionBean a where a.jobId = :jobId AND a.pending = 0"), 115 116 @NamedQuery(name = "GET_COORD_ACTION_FOR_COORD_JOB_BY_ACTION_NUMBER", query = "select a.id from CoordinatorActionBean a where a.jobId = :jobId AND a.actionNumber = :actionNumber"), 117 118 @NamedQuery(name = "GET_COORD_ACTIONS_BY_LAST_MODIFIED_TIME", query = "select a.jobId from CoordinatorActionBean a where a.lastModifiedTimestamp >= :lastModifiedTime"), 119 120 //Used by coordinator store only 121 @NamedQuery(name = "GET_RUNNING_ACTIONS_FOR_COORD_JOB", query = "select OBJECT(a) from CoordinatorActionBean a where a.jobId = :jobId AND a.status = 'RUNNING'"), 122 123 @NamedQuery(name = "GET_RUNNING_ACTIONS_OLDER_THAN", query = "select a.id from CoordinatorActionBean a where a.status = 'RUNNING' AND a.lastModifiedTimestamp <= :lastModifiedTime"), 124 125 @NamedQuery(name = "GET_COORD_ACTIONS_WAITING_SUBMITTED_OLDER_THAN", query = "select a.id, a.jobId, a.status, a.externalId from CoordinatorActionBean a where (a.status = 'WAITING' OR a.status = 'SUBMITTED') AND a.lastModifiedTimestamp <= :lastModifiedTime"), 126 127 @NamedQuery(name = "GET_COORD_ACTIONS_FOR_RECOVERY_OLDER_THAN", query = "select a.id, a.jobId, a.status, a.externalId from CoordinatorActionBean a where a.pending > 0 AND (a.status = 'SUSPENDED' OR a.status = 'KILLED' OR a.status = 'RUNNING') AND a.lastModifiedTimestamp <= :lastModifiedTime"), 128 // Select query used by rerun, requires almost all columns so select * is used 129 @NamedQuery(name = "GET_ACTIONS_FOR_DATES", query = "select OBJECT(a) from CoordinatorActionBean a where a.jobId = :jobId AND (a.status = 'TIMEDOUT' OR a.status = 'SUCCEEDED' OR a.status = 'KILLED' OR a.status = 'FAILED') AND a.nominalTimestamp >= :startTime AND a.nominalTimestamp <= :endTime"), 130 // Select query used by log 131 @NamedQuery(name = "GET_ACTION_IDS_FOR_DATES", query = "select a.id from CoordinatorActionBean a where a.jobId = :jobId AND (a.status = 'TIMEDOUT' OR a.status = 'SUCCEEDED' OR a.status = 'KILLED' OR a.status = 'FAILED') AND a.nominalTimestamp >= :startTime AND a.nominalTimestamp <= :endTime"), 132 // Select query used by rerun, requires almost all columns so select * is used 133 @NamedQuery(name = "GET_ACTION_FOR_NOMINALTIME", query = "select OBJECT(a) from CoordinatorActionBean a where a.jobId = :jobId AND a.nominalTimestamp = :nominalTime"), 134 135 @NamedQuery(name = "GET_COORD_ACTIONS_COUNT", query = "select count(w) from CoordinatorActionBean w")}) 136 137 @NamedNativeQueries({ 138 139 @NamedNativeQuery(name = "GET_READY_ACTIONS_GROUP_BY_JOBID", query = "select a.job_id as job_id, MIN(a.last_modified_time) as min_lmt from COORD_ACTIONS a where a.status = 'READY' GROUP BY a.job_id HAVING MIN(a.last_modified_time) < ?", resultSetMapping = "CoordActionJobIdLmt") 140 }) 141 public class CoordinatorActionBean extends JsonCoordinatorAction implements 142 Writable { 143 @Basic 144 @Index 145 @Column(name = "job_id") 146 private String jobId; 147 148 @Basic 149 @Index 150 @Column(name = "status") 151 private String status = null; 152 153 @Basic 154 @Index 155 @Column(name = "nominal_time") 156 private java.sql.Timestamp nominalTimestamp = null; 157 158 @Basic 159 @Index 160 @Column(name = "last_modified_time") 161 private java.sql.Timestamp lastModifiedTimestamp = null; 162 163 @Basic 164 @Index 165 @Column(name = "created_time") 166 private java.sql.Timestamp createdTimestamp = null; 167 168 @Basic 169 @Index 170 @Column(name = "rerun_time") 171 private java.sql.Timestamp rerunTimestamp = null; 172 173 @Basic 174 @Index 175 @Column(name = "external_id") 176 private String externalId; 177 178 @Column(name = "sla_xml") 179 @Lob 180 private String slaXml = null; 181 182 @Basic 183 @Column(name = "pending") 184 private int pending = 0; 185 186 public CoordinatorActionBean() { 187 } 188 189 /** 190 * Serialize the coordinator bean to a data output. 191 * 192 * @param dataOutput data output. 193 * @throws IOException thrown if the coordinator bean could not be 194 * serialized. 195 */ 196 @Override 197 public void write(DataOutput dataOutput) throws IOException { 198 WritableUtils.writeStr(dataOutput, getJobId()); 199 WritableUtils.writeStr(dataOutput, getType()); 200 WritableUtils.writeStr(dataOutput, getId()); 201 WritableUtils.writeStr(dataOutput, getCreatedConf()); 202 WritableUtils.writeStr(dataOutput, getStatus().toString()); 203 dataOutput.writeInt(getActionNumber()); 204 WritableUtils.writeStr(dataOutput, getRunConf()); 205 WritableUtils.writeStr(dataOutput, getExternalStatus()); 206 WritableUtils.writeStr(dataOutput, getTrackerUri()); 207 WritableUtils.writeStr(dataOutput, getConsoleUrl()); 208 WritableUtils.writeStr(dataOutput, getErrorCode()); 209 WritableUtils.writeStr(dataOutput, getErrorMessage()); 210 dataOutput.writeLong((getCreatedTime() != null) ? getCreatedTime().getTime() : -1); 211 dataOutput.writeLong((getLastModifiedTime() != null) ? getLastModifiedTime().getTime() : -1); 212 } 213 214 /** 215 * Deserialize a coordinator bean from a data input. 216 * 217 * @param dataInput data input. 218 * @throws IOException thrown if the workflow bean could not be 219 * deserialized. 220 */ 221 @Override 222 public void readFields(DataInput dataInput) throws IOException { 223 setJobId(WritableUtils.readStr(dataInput)); 224 setType(WritableUtils.readStr(dataInput)); 225 setId(WritableUtils.readStr(dataInput)); 226 setCreatedConf(WritableUtils.readStr(dataInput)); 227 setStatus(CoordinatorAction.Status.valueOf(WritableUtils.readStr(dataInput))); 228 setActionNumber(dataInput.readInt()); 229 setRunConf(WritableUtils.readStr(dataInput)); 230 setExternalStatus(WritableUtils.readStr(dataInput)); 231 setTrackerUri(WritableUtils.readStr(dataInput)); 232 setConsoleUrl(WritableUtils.readStr(dataInput)); 233 setErrorCode(WritableUtils.readStr(dataInput)); 234 setErrorMessage(WritableUtils.readStr(dataInput)); 235 long d = dataInput.readLong(); 236 if (d != -1) { 237 setCreatedTime(new Date(d)); 238 } 239 d = dataInput.readLong(); 240 if (d != -1) { 241 setLastModifiedTime(new Date(d)); 242 } 243 } 244 245 @Override 246 public String getJobId() { 247 return this.jobId; 248 } 249 250 @Override 251 public void setJobId(String id) { 252 super.setJobId(id); 253 this.jobId = id; 254 } 255 256 @Override 257 public Status getStatus() { 258 return Status.valueOf(status); 259 } 260 261 @Override 262 public void setStatus(Status status) { 263 super.setStatus(status); 264 this.status = status.toString(); 265 } 266 267 @Override 268 public void setCreatedTime(Date createdTime) { 269 this.createdTimestamp = DateUtils.convertDateToTimestamp(createdTime); 270 super.setCreatedTime(createdTime); 271 } 272 273 public void setRerunTime(Date rerunTime) { 274 this.rerunTimestamp = DateUtils.convertDateToTimestamp(rerunTime); 275 } 276 277 @Override 278 public void setNominalTime(Date nominalTime) { 279 this.nominalTimestamp = DateUtils.convertDateToTimestamp(nominalTime); 280 super.setNominalTime(nominalTime); 281 } 282 283 @Override 284 public void setLastModifiedTime(Date lastModifiedTime) { 285 this.lastModifiedTimestamp = DateUtils.convertDateToTimestamp(lastModifiedTime); 286 super.setLastModifiedTime(lastModifiedTime); 287 } 288 289 @Override 290 public Date getCreatedTime() { 291 return DateUtils.toDate(createdTimestamp); 292 } 293 294 public Timestamp getCreatedTimestamp() { 295 return createdTimestamp; 296 } 297 298 public Date getRerunTime() { 299 return DateUtils.toDate(rerunTimestamp); 300 } 301 302 public Timestamp getRerunTimestamp() { 303 return rerunTimestamp; 304 } 305 306 @Override 307 public Date getLastModifiedTime() { 308 return DateUtils.toDate(lastModifiedTimestamp); 309 } 310 311 public Timestamp getLastModifiedTimestamp() { 312 return lastModifiedTimestamp; 313 } 314 315 @Override 316 public Date getNominalTime() { 317 return DateUtils.toDate(nominalTimestamp); 318 } 319 320 public Timestamp getNominalTimestamp() { 321 return nominalTimestamp; 322 } 323 324 @Override 325 public String getExternalId() { 326 return externalId; 327 } 328 329 @Override 330 public void setExternalId(String externalId) { 331 super.setExternalId(externalId); 332 this.externalId = externalId; 333 } 334 335 public String getSlaXml() { 336 return slaXml; 337 } 338 339 public void setSlaXml(String slaXml) { 340 this.slaXml = slaXml; 341 } 342 343 /** 344 * @return true if in terminal status 345 */ 346 public boolean isTerminalStatus() { 347 boolean isTerminal = true; 348 switch (getStatus()) { 349 case WAITING: 350 case READY: 351 case SUBMITTED: 352 case RUNNING: 353 case SUSPENDED: 354 isTerminal = false; 355 break; 356 default: 357 isTerminal = true; 358 break; 359 } 360 return isTerminal; 361 } 362 363 /** 364 * Set some actions are in progress for particular coordinator action. 365 * 366 * @param pending set pending to true 367 */ 368 public void setPending(int pending) { 369 this.pending = pending; 370 } 371 372 /** 373 * increment pending and return it 374 * 375 * @return pending 376 */ 377 public int incrementAndGetPending() { 378 this.pending++; 379 return pending; 380 } 381 382 /** 383 * decrement pending and return it 384 * 385 * @return pending 386 */ 387 public int decrementAndGetPending() { 388 this.pending = Math.max(this.pending - 1, 0); 389 return pending; 390 } 391 392 /** 393 * Get some actions are in progress for particular bundle action. 394 * 395 * @return pending 396 */ 397 public int getPending() { 398 return this.pending; 399 } 400 401 /** 402 * Return if the action is pending. 403 * 404 * @return if the action is pending. 405 */ 406 public boolean isPending() { 407 return pending > 0 ? true : false; 408 } 409 }