001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.oozie; 019 020import java.io.DataInput; 021import java.io.DataOutput; 022import java.io.IOException; 023import java.sql.Timestamp; 024import java.util.Date; 025 026import javax.persistence.Basic; 027import javax.persistence.Column; 028import javax.persistence.Entity; 029import javax.persistence.Id; 030import javax.persistence.NamedQueries; 031import javax.persistence.NamedQuery; 032import javax.persistence.Table; 033 034import org.apache.hadoop.io.Writable; 035import org.apache.oozie.client.Job.Status; 036import org.apache.oozie.client.rest.JsonBean; 037import org.apache.oozie.util.DateUtils; 038import org.apache.oozie.util.WritableUtils; 039import org.apache.openjpa.persistence.jdbc.Index; 040import org.json.simple.JSONObject; 041 042@Entity 043@Table(name = "BUNDLE_ACTIONS") 044@NamedQueries( { 045 @NamedQuery(name = "DELETE_BUNDLE_ACTION", query = "delete from BundleActionBean w where w.bundleActionId = :bundleActionId"), 046 047 @NamedQuery(name = "UPDATE_BUNDLE_ACTION_PENDING_MODTIME", query = "update BundleActionBean w set w.lastModifiedTimestamp = :lastModifiedTime, w.pending = :pending where w.bundleActionId = :bundleActionId"), 048 049 @NamedQuery(name = "UPDATE_BUNDLE_ACTION_STATUS_PENDING_MODTIME", query = "update BundleActionBean w set w.statusStr = :status, w.lastModifiedTimestamp = :lastModifiedTime, w.pending = :pending where w.bundleActionId = :bundleActionId"), 050 051 @NamedQuery(name = "UPDATE_BUNDLE_ACTION_STATUS_PENDING_MODTIME_COORDID", query = "update BundleActionBean w set w.statusStr = :status, w.lastModifiedTimestamp = :lastModifiedTime, w.pending = :pending, w.coordId = :coordId where w.bundleActionId = :bundleActionId"), 052 053 @NamedQuery(name = "GET_BUNDLE_ACTIONS_STATUS_UNIGNORED_FOR_BUNDLE", query = "select OBJECT(w) from BundleActionBean w where w.bundleId = :bundleId AND w.statusStr <> 'IGNORED'"), 054 055 @NamedQuery(name = "GET_BUNDLE_UNIGNORED_ACTION_STATUS_PENDING_FOR_BUNDLE", query = "select w.coordId, w.statusStr, w.pending from BundleActionBean w where w.bundleId = :bundleId AND w.statusStr <> 'IGNORED'"), 056 057 @NamedQuery(name = "GET_BUNDLE_ACTIONS", query = "select OBJECT(w) from BundleActionBean w"), 058 059 @NamedQuery(name = "GET_BUNDLE_ACTIONS_BY_LAST_MODIFIED_TIME", query = "select w.bundleId from BundleActionBean w where w.lastModifiedTimestamp >= :lastModifiedTime"), 060 061 @NamedQuery(name = "GET_BUNDLE_WAITING_ACTIONS_OLDER_THAN", query = "select w.bundleActionId, w.bundleId, w.statusStr, w.coordId, w.coordName from BundleActionBean w where w.pending > 0 AND w.lastModifiedTimestamp <= :lastModifiedTime"), 062 063 @NamedQuery(name = "GET_BUNDLE_ACTION", query = "select OBJECT(w) from BundleActionBean w where w.bundleActionId = :bundleActionId"), 064 065 @NamedQuery(name = "GET_BUNDLE_ACTIONS_COUNT", query = "select count(w) from BundleActionBean w"), 066 067 @NamedQuery(name = "GET_BUNDLE_ACTIONS_COUNT_BY_JOB", query = "select count(w) from BundleActionBean w where w.bundleId = :bundleId"), 068 069 @NamedQuery(name = "GET_BUNDLE_ACTIONS_PENDING_TRUE_COUNT", query = "select count(w) from BundleActionBean w where w.bundleId = :bundleId AND w.pending > 0"), 070 071 @NamedQuery(name = "GET_BUNDLE_ACTIONS_NOT_EQUAL_STATUS_COUNT", query = "select count(w) from BundleActionBean w where w.bundleId = :bundleId AND w.statusStr <> :status"), 072 073 @NamedQuery(name = "GET_BUNDLE_ACTIONS_NOT_TERMINATE_STATUS_COUNT", query = "select count(w) from BundleActionBean w where w.bundleId = :bundleId AND (w.statusStr = 'PREP' OR w.statusStr = 'RUNNING' OR w.statusStr = 'RUNNINGWITHERROR' OR w.statusStr = 'SUSPENDED' OR w.statusStr = 'SUSPENDEDWITHERROR' OR w.statusStr = 'PREPSUSPENDED' OR w.statusStr = 'PAUSED' OR w.statusStr = 'PAUSEDWITHERROR' OR w.statusStr = 'PREPPAUSED')"), 074 075 @NamedQuery(name = "GET_BUNDLE_ACTIONS_FAILED_NULL_COORD_COUNT", query = "select count(w) from BundleActionBean w where w.bundleId = :bundleId AND w.statusStr = 'FAILED' AND w.coordId IS NULL"), 076 077 @NamedQuery(name = "GET_BUNDLE_ACTIONS_OLDER_THAN", query = "select OBJECT(w) from BundleActionBean w order by w.lastModifiedTimestamp"), 078 079 @NamedQuery(name = "DELETE_COMPLETED_ACTIONS_FOR_BUNDLE", query = "delete from BundleActionBean a where a.bundleId = :bundleId and (a.statusStr = 'SUCCEEDED' OR a.statusStr = 'FAILED' OR a.statusStr= 'KILLED' OR a.statusStr = 'DONEWITHERROR')"), 080 081 @NamedQuery(name = "DELETE_ACTIONS_FOR_BUNDLE", query = "delete from BundleActionBean a where a.bundleId = :bundleId")}) 082public class BundleActionBean implements Writable, JsonBean { 083 084 @Id 085 @Column(name = "bundle_action_id") 086 private String bundleActionId = null; 087 088 @Index 089 @Column(name = "bundle_id") 090 private String bundleId = null; 091 092 @Column(name = "coord_name") 093 private String coordName = null; 094 095 @Basic 096 @Column(name = "coord_id") 097 private String coordId = null; 098 099 @Basic 100 @Column(name = "status") 101 private String statusStr = null; 102 103 @Basic 104 @Column(name = "critical") 105 private int critical = 0; 106 107 @Basic 108 @Column(name = "pending") 109 private int pending = 0; 110 111 @Basic 112 @Column(name = "last_modified_time") 113 private java.sql.Timestamp lastModifiedTimestamp = null; 114 115 /** 116 * bundleActionId to set 117 * 118 * @param bundleActionId the bundleActionId to set 119 */ 120 public void setBundleActionId(String bundleActionId) { 121 this.bundleActionId = bundleActionId; 122 } 123 124 /** 125 * Get the Bundle Action Id. 126 * 127 * @return the bundleActionId 128 */ 129 public String getBundleActionId() { 130 return bundleActionId; 131 } 132 133 /** 134 * Get the BundleId 135 * 136 * @return bundleId 137 */ 138 public String getBundleId() { 139 return bundleId; 140 } 141 142 /** 143 * Set the Bundle Id. 144 * 145 * @param bundleId 146 */ 147 public void setBundleId(String bundleId) { 148 this.bundleId = bundleId; 149 } 150 151 /** 152 * Get the Coordinator name. 153 * 154 * @return coordName 155 */ 156 public String getCoordName() { 157 return coordName; 158 } 159 160 /** 161 * Set the Coordinator name. 162 * 163 * @param coordName 164 */ 165 public void setCoordName(String coordName) { 166 this.coordName = coordName; 167 } 168 169 /** 170 * Get the coordinator Id. 171 * 172 * @return the coordId 173 */ 174 public String getCoordId() { 175 return coordId; 176 } 177 178 /** 179 * Set the coordinator Id. 180 * 181 * @param coordId 182 */ 183 public void setCoordId(String coordId) { 184 this.coordId = coordId; 185 } 186 187 /** 188 * Get the Status of the Bundle Action 189 * 190 * @return status object 191 */ 192 public Status getStatus() { 193 return Status.valueOf(this.statusStr); 194 } 195 196 /** 197 * Get the Status of the Bundle Action 198 * 199 * @return status string 200 */ 201 public String getStatusStr() { 202 return statusStr; 203 } 204 205 /** 206 * Set the Status of the Bundle Action 207 * 208 * @return status string 209 */ 210 public void setStatusStr(String statusStr) { 211 this.statusStr = statusStr; 212 } 213 214 /** 215 * Set the Status of the Bundle Action 216 * 217 * @param val 218 */ 219 public void setStatus(Status val) { 220 this.statusStr = val.toString(); 221 } 222 223 /** 224 * Set Whether this bundle action is critical or not. 225 * 226 * @param critical set critical to true 227 */ 228 public void setCritical() { 229 this.critical = 1; 230 } 231 232 /** 233 * Reseset Whether this bundle action is critical or not. 234 * 235 * @param critical set critical to false 236 */ 237 public void resetCritical() { 238 this.critical = 0; 239 } 240 241 /** 242 * Return if the action is critical. 243 * 244 * @return if the action is critical. 245 */ 246 public boolean isCritical() { 247 return critical == 1 ? true : false; 248 } 249 250 /** 251 * Set some actions are in progress for particular bundle action. 252 * 253 * @param pending set pending to true 254 */ 255 public void setPending(int pending) { 256 this.pending = pending; 257 } 258 259 /** 260 * increment pending and return it 261 * 262 * @return pending 263 */ 264 public int incrementAndGetPending() { 265 this.pending++; 266 return pending; 267 } 268 269 /** 270 * decrement pending and return it 271 * 272 * @return pending 273 */ 274 public int decrementAndGetPending() { 275 this.pending = Math.max(this.pending-1, 0); 276 return pending; 277 } 278 279 /** 280 * Get some actions are in progress for particular bundle action. 281 * 282 * @return pending 283 */ 284 public int getPending() { 285 return this.pending; 286 } 287 288 /** 289 * Return if the action is pending. 290 * 291 * @return if the action is pending. 292 */ 293 public boolean isPending() { 294 return pending > 0 ? true : false; 295 } 296 297 /** 298 * @return true if in terminal status 299 */ 300 public boolean isTerminalStatus() { 301 boolean isTerminal = false; 302 switch (getStatus()) { 303 case SUCCEEDED: 304 case FAILED: 305 case KILLED: 306 case DONEWITHERROR: 307 isTerminal = true; 308 break; 309 default: 310 isTerminal = false; 311 break; 312 } 313 return isTerminal; 314 } 315 316 /** 317 * Set Last modified time. 318 * 319 * @param lastModifiedTimestamp the lastModifiedTimestamp to set 320 */ 321 public void setLastModifiedTimestamp(java.sql.Timestamp lastModifiedTimestamp) { 322 this.lastModifiedTimestamp = lastModifiedTimestamp; 323 } 324 325 /** 326 * Set Last modified time. 327 * 328 * @param lastModifiedTime the lastModifiedTime to set 329 */ 330 public void setLastModifiedTime(Date lastModifiedTime) { 331 this.lastModifiedTimestamp = DateUtils.convertDateToTimestamp(lastModifiedTime); 332 } 333 334 /** 335 * Get Last modified time. 336 * 337 * @return lastModifiedTime 338 */ 339 public Date getLastModifiedTime() { 340 return DateUtils.toDate(lastModifiedTimestamp); 341 } 342 343 /** 344 * Get Last modified time. 345 * 346 * @return lastModifiedTimestamp 347 */ 348 public Timestamp getLastModifiedTimestamp() { 349 return lastModifiedTimestamp; 350 } 351 352 @Override 353 public void write(DataOutput dataOutput) throws IOException { 354 WritableUtils.writeStr(dataOutput, getBundleActionId()); 355 WritableUtils.writeStr(dataOutput, getBundleId()); 356 WritableUtils.writeStr(dataOutput, getCoordName()); 357 WritableUtils.writeStr(dataOutput, getCoordId()); 358 WritableUtils.writeStr(dataOutput, getStatusStr()); 359 dataOutput.writeInt(critical); 360 dataOutput.writeInt(pending); 361 dataOutput.writeLong((getLastModifiedTimestamp() != null) ? getLastModifiedTimestamp().getTime() : -1); 362 } 363 364 @Override 365 public void readFields(DataInput dataInput) throws IOException { 366 setBundleActionId(WritableUtils.readStr(dataInput)); 367 setBundleId(WritableUtils.readStr(dataInput)); 368 setCoordName(WritableUtils.readStr(dataInput)); 369 setCoordId(WritableUtils.readStr(dataInput)); 370 setStatus(Status.valueOf(WritableUtils.readStr(dataInput))); 371 critical = dataInput.readInt(); 372 pending = dataInput.readInt(); 373 long d = dataInput.readLong(); 374 if (d != -1) { 375 setLastModifiedTime(new Date(d)); 376 } 377 } 378 379 @Override 380 public JSONObject toJSONObject() { 381 return null; 382 } 383 384 @Override 385 public JSONObject toJSONObject(String timeZoneId) { 386 return null; 387 } 388}