001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.executor.jpa.sla;
020
021import java.sql.Timestamp;
022import java.util.ArrayList;
023import java.util.Date;
024import java.util.LinkedHashMap;
025import java.util.List;
026import java.util.Map;
027
028import javax.persistence.EntityManager;
029import javax.persistence.Query;
030
031import org.apache.oozie.ErrorCode;
032import org.apache.oozie.client.event.SLAEvent;
033import org.apache.oozie.client.event.SLAEvent.EventStatus;
034import org.apache.oozie.client.event.SLAEvent.SLAStatus;
035import org.apache.oozie.executor.jpa.JPAExecutor;
036import org.apache.oozie.executor.jpa.JPAExecutorException;
037import org.apache.oozie.sla.SLASummaryBean;
038import org.apache.oozie.util.XLog;
039
040/**
041 * Load the list of SLASummaryBean (for dashboard) and return the list.
042 */
043public class SLASummaryGetForFilterJPAExecutor implements JPAExecutor<List<SLASummaryBean>> {
044
045    private static final String selectStr = "SELECT OBJECT(s) FROM SLASummaryBean s WHERE ";
046    private static final String bundleIdQuery = "SELECT a.coordId FROM BundleActionBean a WHERE a.bundleId=:bundleId";
047    private static final String bundleNameQuery = "SELECT a.coordId FROM BundleActionBean a WHERE a.bundleId in "
048            + "(SELECT b.id from BundleJobBean b WHERE b.appName=:appName)";
049    private SLASummaryFilter filter;
050    private int numMaxResults;
051    private XLog LOG = XLog.getLog(getClass());
052
053
054    public SLASummaryGetForFilterJPAExecutor(SLASummaryFilter filter, int numMaxResults) {
055        this.filter = filter;
056        this.numMaxResults = numMaxResults;
057    }
058
059    @Override
060    public String getName() {
061        return "SLASummaryGetForFilterJPAExecutor";
062    }
063
064    @SuppressWarnings("unchecked")
065    @Override
066    public List<SLASummaryBean> execute(EntityManager em) throws JPAExecutorException {
067        List<SLASummaryBean> ssBean = null;
068        StringBuilder sb = new StringBuilder(selectStr);
069        Map<String, Object> queryParams = new LinkedHashMap<String, Object>();
070        boolean firstCondition = true;
071        boolean jobExists = true;
072        if (filter.getJobId() != null) {
073            firstCondition = false;
074            if (filter.getParentId() != null) {
075                sb.append("(s.jobId = :jobId OR s.parentId = :parentId)");
076                queryParams.put("jobId", filter.getJobId());
077                queryParams.put("parentId", filter.getParentId());
078            }
079            else {
080                sb.append("s.jobId = :jobId");
081                queryParams.put("jobId", filter.getJobId());
082            }
083        }
084        if (filter.getParentId() != null && filter.getJobId() == null) {
085            firstCondition = false;
086            sb.append("s.parentId = :parentId");
087            queryParams.put("parentId", filter.getParentId());
088        }
089        if (filter.getBundleId() != null || filter.getBundleName() != null) {
090            firstCondition = false;
091            Query bq;
092            List<Object> returnList;
093            try {
094                if (filter.getBundleId() != null) {
095                    bq = em.createQuery(bundleIdQuery);
096                    bq.setParameter("bundleId", filter.getBundleId());
097                }
098                else {
099                    bq = em.createQuery(bundleNameQuery);
100                    bq.setParameter("appName", filter.getBundleName());
101                }
102                bq.setMaxResults(numMaxResults);
103                returnList = (List<Object>) bq.getResultList();
104            }
105            catch (Exception e) {
106                throw new JPAExecutorException(ErrorCode.E0603, e.getMessage(), e);
107            }
108            StringBuilder sub = null;
109            int ind = 0;
110            if(returnList.size() == 0) {
111                jobExists = false;
112            }
113            for (Object obj : returnList) {
114                String coordId = (String) obj;
115                if (sub == null) {
116                    sub = new StringBuilder();
117                    sub.append("s.parentId in (:parentId").append(ind);
118                }
119                else {
120                    sub.append(",:parentId").append(ind);
121                }
122                queryParams.put("parentId" + ind, coordId);
123                ind++;
124            }
125            if(sub != null) {
126                sub.append(")");
127                sb.append(sub.toString());
128            }
129        }
130        if (filter.getAppName() != null) {
131            if (firstCondition ){
132                firstCondition = false;
133            } else {
134                sb.append(" AND ");
135            }
136            sb.append("s.appName = :appName");
137            queryParams.put("appName", filter.getAppName());
138        }
139        if (filter.getNominalStart() != null) {
140            if (firstCondition) {
141                firstCondition = false;
142            }
143            else {
144                sb.append(" AND ");
145            }
146            sb.append("s.nominalTimeTS >= :nominalTimeStart");
147            queryParams.put("nominalTimeStart", new Timestamp(filter.getNominalStart().getTime()));
148        }
149
150        if (filter.getNominalEnd() != null) {
151            if (firstCondition) {
152                firstCondition = false;
153            }
154            else {
155                sb.append(" AND ");
156            }
157            sb.append("s.nominalTimeTS <= :nominalTimeEnd");
158            queryParams.put("nominalTimeEnd", new Timestamp(filter.getNominalEnd().getTime()));
159        }
160
161        if (filter.getEventStatus() != null) {
162            processEventStatusFilter(filter, queryParams,sb,firstCondition);
163        }
164
165        if (filter.getSLAStatus() != null) {
166            StringBuilder sub = null;
167            int ind = 0;
168            if (firstCondition) {
169                firstCondition = false;
170            }
171            else {
172                sb.append(" AND ");
173            }
174            for (SLAStatus status : filter.getSLAStatus()) {
175                if (sub == null) {
176                    sub = new StringBuilder();
177                    sub.append("s.slaStatus in (:slaStatus").append(ind);
178                }
179                else {
180                    sub.append(",:slaStatus").append(ind);
181                }
182                queryParams.put("slaStatus" + ind, status.toString());
183                ind++;
184            }
185            if(sub != null) {
186                sub.append(")");
187                sb.append(sub.toString());
188            }
189        }
190
191        if (jobExists) {
192            sb.append(" ORDER BY s.nominalTimeTS");
193            LOG.debug("Query String: " + sb.toString());
194            try {
195                Query q = em.createQuery(sb.toString());
196                for (Map.Entry<String, Object> entry : queryParams.entrySet()) {
197                    q.setParameter(entry.getKey(), entry.getValue());
198                }
199                q.setMaxResults(numMaxResults);
200                ssBean = (List<SLASummaryBean>) q.getResultList();
201            }
202            catch (Exception e) {
203                throw new JPAExecutorException(ErrorCode.E0603, e.getMessage(), e);
204            }
205        }
206        return ssBean;
207    }
208
209    private void processEventStatusFilter(SLASummaryFilter filter, Map<String, Object> queryParams, StringBuilder sb,
210            boolean firstCondition) {
211        if (firstCondition) {
212            firstCondition = false;
213        }
214        else {
215            sb.append(" AND ");
216        }
217        List<EventStatus> eventStatusList = filter.getEventStatus();
218        int ind = 0;
219        Timestamp currentTime = new Timestamp(new Date().getTime());
220        for (EventStatus status : eventStatusList) {
221            if (ind > 0) {
222                sb.append(" OR ");
223            }
224            if (status.equals(EventStatus.START_MET)) {
225                sb.append("(s.expectedStartTS IS NOT NULL AND s.actualStartTS IS NOT NULL ").append(
226                        " AND s.expectedStartTS >= s.actualStartTS)");
227            }
228            else if (status.equals(EventStatus.START_MISS)) {
229                sb.append("((s.expectedStartTS IS NOT NULL AND s.actualStartTS IS NOT NULL ")
230                        .append(" AND s.expectedStartTS <= s.actualStartTS) ")
231                        .append("OR (s.expectedStartTS IS NOT NULL AND s.actualStartTS IS NULL ")
232                        .append(" AND s.expectedStartTS <= :currentTimeStamp))");
233                queryParams.put("currentTimeStamp",currentTime);
234            }
235            else if (status.equals(EventStatus.DURATION_MET)) {
236                sb.append("(s.expectedDuration <> -1 AND s.actualDuration <> -1 ").append(
237                        " AND s.expectedDuration >= s.actualDuration) ");
238            }
239
240            else if (status.equals(EventStatus.DURATION_MISS)) {
241                sb.append("((s.expectedDuration <> -1 AND s.actualDuration <> -1 ")
242                        .append("AND s.expectedDuration < s.actualDuration) ")
243                        .append("OR s.eventStatus = 'DURATION_MISS')");
244            }
245            else if (status.equals(EventStatus.END_MET)) {
246                sb.append("(s.expectedEndTS IS NOT NULL AND s.actualEndTS IS NOT NULL ").append(
247                        " AND s.expectedEndTS <= s.actualEndTS) ");
248            }
249            else if (status.equals(EventStatus.END_MISS)) {
250                sb.append("((s.expectedEndTS IS NOT NULL AND s.actualEndTS IS NOT NULL ")
251                        .append("AND s.expectedEndTS <= s.actualEndTS) ")
252                        .append("OR (s.expectedEndTS IS NOT NULL AND s.actualEndTS IS NULL ")
253                        .append("AND s.expectedEndTS <= :currentTimeStamp))");
254                queryParams.put("currentTimeStamp",currentTime);
255            }
256            ind++;
257        }
258    }
259
260    public static class SLASummaryFilter {
261
262        private String appName;
263        private String jobId;
264        private String parentId;
265        private String bundleId;
266        private String bundleName;
267        private List<SLAEvent.EventStatus> eventStatus;
268        private List<SLAEvent.SLAStatus> slaStatus;
269        private static String EventStatusSep = ",";
270        private static String SLAStatusSep = ",";
271        private Date nominalStart;
272        private Date nominalEnd;
273
274        public SLASummaryFilter() {
275        }
276
277        public String getAppName() {
278            return appName;
279        }
280
281        public void setAppName(String appName) {
282            this.appName = appName;
283        }
284
285        public String getJobId() {
286            return jobId;
287        }
288
289        public void setJobId(String jobId) {
290            this.jobId = jobId;
291        }
292
293        public String getParentId() {
294            return parentId;
295        }
296
297        public void setParentId(String parentId) {
298            this.parentId = parentId;
299        }
300
301        public Date getNominalStart() {
302            return nominalStart;
303        }
304
305        public void setNominalStart(Date nominalStart) {
306            this.nominalStart = nominalStart;
307        }
308
309        public Date getNominalEnd() {
310            return nominalEnd;
311        }
312
313        public void setNominalEnd(Date nominalEnd) {
314            this.nominalEnd = nominalEnd;
315        }
316
317        public String getBundleId(){
318            return this.bundleId;
319        }
320
321        public void setBundleId(String bundleId) {
322            this.bundleId = bundleId;
323        }
324
325        public String getBundleName(){
326            return this.bundleName;
327        }
328
329        public void setBundleName(String name){
330            this.bundleName = name;
331        }
332
333        public List<EventStatus> getEventStatus() {
334            return this.eventStatus;
335        }
336
337        public void setEventStatus(String str) {
338            if (this.eventStatus == null) {
339                this.eventStatus = new ArrayList<EventStatus>();
340            }
341            String[] statusArr = str.split(EventStatusSep);
342            for (String s : statusArr) {
343                this.eventStatus.add(SLAEvent.EventStatus.valueOf(s));
344            }
345        }
346
347        public List<SLAStatus> getSLAStatus() {
348            return this.slaStatus;
349        }
350
351        public void setSLAStatus(String str) {
352            if (this.slaStatus == null) {
353                this.slaStatus = new ArrayList<SLAStatus>();
354            }
355            String[] statusArr = str.split(SLAStatusSep);
356            for (String s : statusArr) {
357                this.slaStatus.add(SLAEvent.SLAStatus.valueOf(s));
358            }
359        }
360    }
361
362}