001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.executor.jpa;
019    
020    import java.sql.Timestamp;
021    import java.text.ParseException;
022    import java.util.ArrayList;
023    import java.util.Collections;
024    import java.util.HashMap;
025    import java.util.Iterator;
026    import java.util.List;
027    import java.util.Map;
028    import java.util.Map.Entry;
029    
030    import javax.persistence.EntityManager;
031    import javax.persistence.Query;
032    
033    import org.apache.oozie.BundleJobBean;
034    import org.apache.oozie.client.BundleJob;
035    import org.apache.oozie.client.CoordinatorJob;
036    import org.apache.oozie.client.rest.BulkResponseImpl;
037    import org.apache.oozie.BulkResponseInfo;
038    import org.apache.oozie.CoordinatorActionBean;
039    import org.apache.oozie.CoordinatorJobBean;
040    import org.apache.oozie.ErrorCode;
041    import org.apache.oozie.client.CoordinatorAction;
042    import org.apache.oozie.util.DateUtils;
043    import org.apache.oozie.util.ParamChecker;
044    
045    /**
046     * The query executor class for bulk monitoring queries i.e. debugging bundle ->
047     * coord actions directly
048     */
049    public class BulkJPAExecutor implements JPAExecutor<BulkResponseInfo> {
050        private Map<String, List<String>> bulkFilter;
051        // defaults
052        private int start = 1;
053        private int len = 50;
054    
055        public BulkJPAExecutor(Map<String, List<String>> bulkFilter, int start, int len) {
056            ParamChecker.notNull(bulkFilter, "bulkFilter");
057            this.bulkFilter = bulkFilter;
058            this.start = start;
059            this.len = len;
060        }
061    
062        /*
063         * (non-Javadoc)
064         * @see org.apache.oozie.executor.jpa.JPAExecutor#getName()
065         */
066        @Override
067        public String getName() {
068            return "BulkJPAExecutor";
069        }
070    
071        /*
072         * (non-Javadoc)
073         * @see org.apache.oozie.executor.jpa.JPAExecutor#execute(javax.persistence.EntityManager)
074         */
075        @Override
076        public BulkResponseInfo execute(EntityManager em) throws JPAExecutorException {
077            List<BulkResponseImpl> responseList = new ArrayList<BulkResponseImpl>();
078            Map<String, Timestamp> actionTimes = new HashMap<String, Timestamp>();
079    
080            try {
081                // Lightweight Query 1 on Bundle level to fetch the bundle job
082                // corresponding to name
083                BundleJobBean bundleBean = bundleQuery(em);
084    
085                // Join query between coordinator job and coordinator action tables
086                // to get entries for specific bundleId only
087                String conditions = actionQuery(em, bundleBean, actionTimes, responseList);
088    
089                // Query to get the count of records
090                long total = countQuery(conditions, em, bundleBean, actionTimes);
091    
092                BulkResponseInfo bulk = new BulkResponseInfo(responseList, start, len, total);
093                return bulk;
094            }
095            catch (Exception e) {
096                throw new JPAExecutorException(ErrorCode.E0603, e.getMessage(), e);
097            }
098        }
099    
100        @SuppressWarnings("unchecked")
101        private BundleJobBean bundleQuery(EntityManager em) throws JPAExecutorException {
102            BundleJobBean bundleBean = new BundleJobBean();
103            String bundleName = bulkFilter.get(BulkResponseImpl.BULK_FILTER_BUNDLE_NAME).get(0);
104            Query q = em.createNamedQuery("BULK_MONITOR_BUNDLE_QUERY");
105            q.setParameter("appName", bundleName);
106            List<Object[]> bundles = (List<Object[]>) q.getResultList();
107            if (bundles.isEmpty()) {
108                throw new JPAExecutorException(ErrorCode.E0603, "No bundle entries found for bundle name: "
109                        + bundleName);
110            }
111            if (bundles.size() > 1) { // more than one bundles running with same
112                                      // name - ERROR. Fail fast
113                throw new JPAExecutorException(ErrorCode.E0603, "Non-unique bundles present for same bundle name: "
114                        + bundleName);
115            }
116            bundleBean = getBeanForBundleJob(bundles.get(0), bundleName);
117            return bundleBean;
118        }
119    
120        @SuppressWarnings("unchecked")
121        private String actionQuery(EntityManager em, BundleJobBean bundleBean,
122                Map<String, Timestamp> times, List<BulkResponseImpl> responseList) throws ParseException {
123            Query q = em.createNamedQuery("BULK_MONITOR_ACTIONS_QUERY");
124            StringBuilder getActions = new StringBuilder(q.toString());
125            StringBuilder conditionClause = new StringBuilder();
126            conditionClause.append(coordNamesClause(bulkFilter.get(BulkResponseImpl.BULK_FILTER_COORD_NAME)));
127            conditionClause.append(statusClause(bulkFilter.get(BulkResponseImpl.BULK_FILTER_STATUS)));
128            int offset = getActions.indexOf("ORDER");
129            getActions.insert(offset - 1, conditionClause);
130            timesClause(getActions, offset, times);
131            q = em.createQuery(getActions.toString());
132            Iterator<Entry<String, Timestamp>> iter = times.entrySet().iterator();
133            while (iter.hasNext()) {
134                Entry<String, Timestamp> time = iter.next();
135                q.setParameter(time.getKey(), time.getValue());
136            }
137            q.setParameter("bundleId", bundleBean.getId());
138            // pagination
139            q.setFirstResult(start - 1);
140            q.setMaxResults(len);
141    
142            List<Object[]> response = q.getResultList();
143            for (Object[] r : response) {
144                BulkResponseImpl br = getResponseFromObject(bundleBean, r);
145                responseList.add(br);
146            }
147            return q.toString();
148        }
149    
150        private long countQuery(String clause, EntityManager em, BundleJobBean bundleBean, Map<String, Timestamp> times) {
151            Query q = em.createNamedQuery("BULK_MONITOR_COUNT_QUERY");
152            StringBuilder getTotal = new StringBuilder(q.toString() + " ");
153            getTotal.append(clause.substring(clause.indexOf("WHERE"), clause.indexOf("ORDER")));
154            q = em.createQuery(getTotal.toString());
155            q.setParameter("bundleId", bundleBean.getId());
156            Iterator<Entry<String, Timestamp>> iter = times.entrySet().iterator();
157            while (iter.hasNext()) {
158                Entry<String, Timestamp> time = iter.next();
159                q.setParameter(time.getKey(), time.getValue());
160            }
161            long total = ((Long) q.getSingleResult()).longValue();
162            return total;
163        }
164    
165        // Form the where clause to filter by coordinator names
166        private StringBuilder coordNamesClause(List<String> coordNames) {
167            StringBuilder sb = new StringBuilder();
168            boolean firstVal = true;
169            for (String name : nullToEmpty(coordNames)) {
170                if (firstVal) {
171                    sb.append(" AND c.appName IN (\'" + name + "\'");
172                    firstVal = false;
173                }
174                else {
175                    sb.append(",\'" + name + "\'");
176                }
177            }
178            if (!firstVal) {
179                sb.append(") ");
180            }
181            return sb;
182        }
183    
184        // Form the where clause to filter by coord action status
185        private StringBuilder statusClause(List<String> statuses) {
186            StringBuilder sb = new StringBuilder();
187            boolean firstVal = true;
188            for (String status : nullToEmpty(statuses)) {
189                if (firstVal) {
190                    sb.append(" AND a.status IN (\'" + status + "\'");
191                    firstVal = false;
192                }
193                else {
194                    sb.append(",\'" + status + "\'");
195                }
196            }
197            if (!firstVal) {
198                sb.append(") ");
199            }
200            else { // statuses was null. adding default
201                sb.append(" AND a.status IN ('KILLED', 'FAILED') ");
202            }
203            return sb;
204        }
205    
206        private void timesClause(StringBuilder sb, int offset, Map<String, Timestamp> eachTime) throws ParseException {
207            Timestamp ts = null;
208            List<String> times = bulkFilter.get(BulkResponseImpl.BULK_FILTER_START_CREATED_EPOCH);
209            if (times != null) {
210                ts = new Timestamp(DateUtils.parseDateUTC(times.get(0)).getTime());
211                sb.insert(offset - 1, " AND a.createdTimestamp >= :startCreated");
212                eachTime.put("startCreated", ts);
213            }
214            times = bulkFilter.get(BulkResponseImpl.BULK_FILTER_END_CREATED_EPOCH);
215            if (times != null) {
216                ts = new Timestamp(DateUtils.parseDateUTC(times.get(0)).getTime());
217                sb.insert(offset - 1, " AND a.createdTimestamp <= :endCreated");
218                eachTime.put("endCreated", ts);
219            }
220            times = bulkFilter.get(BulkResponseImpl.BULK_FILTER_START_NOMINAL_EPOCH);
221            if (times != null) {
222                ts = new Timestamp(DateUtils.parseDateUTC(times.get(0)).getTime());
223                sb.insert(offset - 1, " AND a.nominalTimestamp >= :startNominal");
224                eachTime.put("startNominal", ts);
225            }
226            times = bulkFilter.get(BulkResponseImpl.BULK_FILTER_END_NOMINAL_EPOCH);
227            if (times != null) {
228                ts = new Timestamp(DateUtils.parseDateUTC(times.get(0)).getTime());
229                sb.insert(offset - 1, " AND a.nominalTimestamp <= :endNominal");
230                eachTime.put("endNominal", ts);
231            }
232        }
233    
234        private BulkResponseImpl getResponseFromObject(BundleJobBean bundleBean, Object arr[]) {
235            BulkResponseImpl bean = new BulkResponseImpl();
236            CoordinatorJobBean coordBean = new CoordinatorJobBean();
237            CoordinatorActionBean actionBean = new CoordinatorActionBean();
238            if (arr[0] != null) {
239                actionBean.setId((String) arr[0]);
240            }
241            if (arr[1] != null) {
242                actionBean.setActionNumber((Integer) arr[1]);
243            }
244            if (arr[2] != null) {
245                actionBean.setErrorCode((String) arr[2]);
246            }
247            if (arr[3] != null) {
248                actionBean.setErrorMessage((String) arr[3]);
249            }
250            if (arr[4] != null) {
251                actionBean.setExternalId((String) arr[4]);
252            }
253            if (arr[5] != null) {
254                actionBean.setExternalStatus((String) arr[5]);
255            }
256            if (arr[6] != null) {
257                actionBean.setStatus(CoordinatorAction.Status.valueOf((String) arr[6]));
258            }
259            if (arr[7] != null) {
260                actionBean.setCreatedTime(DateUtils.toDate((Timestamp) arr[7]));
261            }
262            if (arr[8] != null) {
263                actionBean.setNominalTime(DateUtils.toDate((Timestamp) arr[8]));
264            }
265            if (arr[9] != null) {
266                actionBean.setMissingDependencies((String) arr[9]);
267            }
268            if (arr[10] != null) {
269                coordBean.setId((String) arr[10]);
270                actionBean.setJobId((String) arr[10]);
271            }
272            if (arr[11] != null) {
273                coordBean.setAppName((String) arr[11]);
274            }
275            if (arr[12] != null) {
276                coordBean.setStatus(CoordinatorJob.Status.valueOf((String) arr[12]));
277            }
278            bean.setBundle(bundleBean);
279            bean.setCoordinator(coordBean);
280            bean.setAction(actionBean);
281            return bean;
282        }
283    
284        private BundleJobBean getBeanForBundleJob(Object[] barr, String name) throws JPAExecutorException {
285            BundleJobBean bean = new BundleJobBean();
286            if (barr[0] != null) {
287                bean.setId((String) barr[0]);
288            }
289            else {
290                throw new JPAExecutorException(ErrorCode.E0603,
291                        "bundleId returned by query is null - cannot retrieve bulk results");
292            }
293            bean.setAppName(name);
294            if (barr[1] != null) {
295                bean.setStatus(BundleJob.Status.valueOf((String) barr[1]));
296            }
297            return bean;
298        }
299    
300        // null safeguard
301        public static List<String> nullToEmpty(List<String> input) {
302            return input == null ? Collections.<String> emptyList() : input;
303        }
304    
305    }