001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.executor.jpa;
020
021import java.sql.Timestamp;
022import java.text.ParseException;
023import java.util.ArrayList;
024import java.util.Collections;
025import java.util.HashMap;
026import java.util.Iterator;
027import java.util.List;
028import java.util.Map;
029import java.util.Map.Entry;
030import java.util.regex.Matcher;
031import java.util.regex.Pattern;
032
033import javax.persistence.EntityManager;
034import javax.persistence.Query;
035
036import org.apache.oozie.BulkResponseInfo;
037import org.apache.oozie.BundleJobBean;
038import org.apache.oozie.CoordinatorActionBean;
039import org.apache.oozie.CoordinatorJobBean;
040import org.apache.oozie.ErrorCode;
041import org.apache.oozie.StringBlob;
042import org.apache.oozie.client.BundleJob;
043import org.apache.oozie.client.CoordinatorAction;
044import org.apache.oozie.client.CoordinatorJob;
045import org.apache.oozie.client.rest.BulkResponseImpl;
046import org.apache.oozie.service.Services;
047import org.apache.oozie.util.DateUtils;
048import org.apache.oozie.util.ParamChecker;
049
050/**
051 * The query executor class for bulk monitoring queries i.e. debugging bundle coord actions directly
052 */
053public class BulkJPAExecutor implements JPAExecutor<BulkResponseInfo> {
054    private Map<String, List<String>> bulkFilter;
055    // defaults
056    private int start = 1;
057    private int len = 50;
058    private enum PARAM_TYPE {
059        ID, NAME
060    }
061
062    public BulkJPAExecutor(Map<String, List<String>> bulkFilter, int start, int len) {
063        ParamChecker.notNull(bulkFilter, "bulkFilter");
064        this.bulkFilter = bulkFilter;
065        this.start = start;
066        this.len = len;
067    }
068
069    @Override
070    public String getName() {
071        return "BulkJPAExecutor";
072    }
073
074    @Override
075    public BulkResponseInfo execute(EntityManager em) throws JPAExecutorException {
076        List<BulkResponseImpl> responseList = new ArrayList<BulkResponseImpl>();
077        Map<String, Timestamp> actionTimes = new HashMap<String, Timestamp>();
078
079        try {
080            List<String> coords = bulkFilter.get(BulkResponseImpl.BULK_FILTER_COORD);
081            List<String> statuses = bulkFilter.get(BulkResponseImpl.BULK_FILTER_STATUS);
082            List<String> params = new ArrayList<String>();
083
084            // Lightweight Query 1 on Bundle level to fetch the bundle job(s)
085            // corresponding to names or ids
086            List<BundleJobBean> bundleBeans = bundleQuery(em);
087
088            // Join query between coordinator job and coordinator action tables
089            // to get entries for specific bundleId only
090            String conditions = actionQuery(coords, params, statuses, em, bundleBeans, actionTimes, responseList);
091
092            // Query to get the count of records
093            long total = countQuery(statuses, params, conditions, em, bundleBeans, actionTimes);
094
095            BulkResponseInfo bulk = new BulkResponseInfo(responseList, start, len, total);
096            return bulk;
097        }
098        catch (Exception e) {
099            throw new JPAExecutorException(ErrorCode.E0603, e.getMessage(), e);
100        }
101    }
102
103    /**
104     * build the bundle level query to get bundle beans for the specified ids or appnames
105     * @param em
106     * @return List BundleJobBeans
107     * @throws JPAExecutorException
108     */
109    @SuppressWarnings("unchecked")
110    private List<BundleJobBean> bundleQuery(EntityManager em) throws JPAExecutorException {
111        Query q = em.createNamedQuery("BULK_MONITOR_BUNDLE_QUERY");
112        StringBuilder bundleQuery = new StringBuilder(q.toString());
113
114        StringBuilder whereClause = null;
115        List<String> bundles = bulkFilter.get(BulkResponseImpl.BULK_FILTER_BUNDLE);
116        if (bundles != null) {
117            PARAM_TYPE type = getParamType(bundles.get(0), 'B');
118            if (type == PARAM_TYPE.NAME) {
119                whereClause = inClause(bundles.size(), "appName", 'b', "bundles");
120            }
121            else if (type == PARAM_TYPE.ID) {
122                whereClause = inClause(bundles.size(), "id", 'b', "bundles");
123            }
124
125            // Query: select <columns> from BundleJobBean b where b.id IN (...) _or_ b.appName IN (...)
126            bundleQuery.append(whereClause.replace(whereClause.indexOf("AND"), whereClause.indexOf("AND") + 3, "WHERE"));
127            Query tmp = em.createQuery(bundleQuery.toString());
128
129            fillParameters(tmp, "bundles", bundles);
130
131            List<Object[]> bundleObjs = (List<Object[]>) tmp.getResultList();
132            if (bundleObjs.isEmpty()) {
133                throw new JPAExecutorException(ErrorCode.E0603, "No entries found for given bundle(s)");
134            }
135
136            List<BundleJobBean> bundleBeans = new ArrayList<BundleJobBean>();
137            for (Object[] bundleElem : bundleObjs) {
138                bundleBeans.add(constructBundleBean(bundleElem));
139            }
140            return bundleBeans;
141        }
142        return null;
143    }
144
145    /**
146     * Validate and determine whether passed param is job-id or appname
147     * @param id
148     * @param job
149     * @return PARAM_TYPE
150     */
151    private PARAM_TYPE getParamType(String id, char job) {
152        Pattern p = Pattern.compile("\\d{7}-\\d{15}-" + Services.get().getSystemId() + "-" + job);
153        Matcher m = p.matcher(id);
154        if (m.matches()) {
155            return PARAM_TYPE.ID;
156        }
157        return PARAM_TYPE.NAME;
158    }
159
160    /**
161     * Compose the coord action level query comprising bundle id/appname filter and coord action
162     * status filter (if specified) and start-time or nominal-time filter (if specified)
163     * @param em
164     * @param bundles
165     * @param times
166     * @param responseList
167     * @return Query string
168     * @throws ParseException
169     */
170    @SuppressWarnings("unchecked")
171    private String actionQuery(final List<String> coords, final List<String> params, List<String> statuses, EntityManager em,
172                               List<BundleJobBean> bundles, Map<String, Timestamp> times, List<BulkResponseImpl> responseList)
173                               throws ParseException {
174        Query q = em.createNamedQuery("BULK_MONITOR_ACTIONS_QUERY");
175        StringBuilder getActions = new StringBuilder(q.toString());
176        int offset = getActions.indexOf("ORDER");
177        StringBuilder conditionClause = new StringBuilder();
178
179        // Query: Select <columns> from CoordinatorActionBean a, CoordinatorJobBean c WHERE a.jobId = c.id
180        // AND c.bundleId = :bundleId AND c.appName/id IN (...)
181
182        if (coords != null) {
183            PARAM_TYPE type = getParamType(coords.get(0), 'C');
184            if (type == PARAM_TYPE.NAME) {
185                conditionClause.append(inClause(coords.size(), "appName", 'c', "param"));
186                params.addAll(coords);
187            }
188            else if (type == PARAM_TYPE.ID) {
189                conditionClause.append(inClause(coords.size(), "id", 'c', "param"));
190                params.addAll(coords);
191            }
192        }
193        // Query: Select <columns> from CoordinatorActionBean a, CoordinatorJobBean c WHERE a.jobId = c.id
194        // AND c.bundleId = :bundleId AND c.appName/id IN (...) AND a.statusStr IN (...)
195        conditionClause.append(statusClause(statuses));
196
197        offset = getActions.indexOf("ORDER");
198        getActions.insert(offset - 1, conditionClause);
199
200        // Query: Select <columns> from CoordinatorActionBean a, CoordinatorJobBean c WHERE a.jobId = c.id
201        // AND c.bundleId = :bundleId AND c.appName/id IN (...) AND a.statusStr IN (...)
202        // AND a.createdTimestamp >= startCreated _or_ a.createdTimestamp <= endCreated
203        // AND a.nominalTimestamp >= startNominal _or_ a.nominalTimestamp <= endNominal
204        timesClause(getActions, offset, times);
205        q = em.createQuery(getActions.toString());
206        Iterator<Entry<String, Timestamp>> iter = times.entrySet().iterator();
207        while (iter.hasNext()) {
208            Entry<String, Timestamp> time = iter.next();
209            q.setParameter(time.getKey(), time.getValue());
210        }
211        // pagination
212        q.setFirstResult(start - 1);
213        q.setMaxResults(len);
214
215        if (coords != null) {
216            fillParameters(q, "param",  coords);
217        }
218
219        if (statuses != null) {
220            fillParameters(q, "status", statuses);
221        }
222
223        // repeatedly execute above query for each bundle
224        for (BundleJobBean bundle : bundles) {
225            q.setParameter("bundleId", bundle.getId());
226            List<Object[]> response = q.getResultList();
227            for (Object[] r : response) {
228                BulkResponseImpl br = getResponseFromObject(bundle, r);
229                responseList.add(br);
230            }
231        }
232        return q.toString();
233    }
234
235    /**
236     * Get total number of records for use with offset and len in API
237     * @param clause
238     * @param em
239     * @param bundles
240     * @return total count of coord actions
241     */
242    private long countQuery(List<String> statuses, List<String> params, String clause, EntityManager em,
243                            List<BundleJobBean> bundles, Map<String, Timestamp> times) {
244        Query q = em.createNamedQuery("BULK_MONITOR_COUNT_QUERY");
245        StringBuilder getTotal = new StringBuilder(q.toString() + " ");
246        // Query: select COUNT(a) from CoordinatorActionBean a, CoordinatorJobBean c
247        // get entire WHERE clause from above i.e. actionQuery() for all conditions on coordinator job
248        // and action status and times
249        getTotal.append(clause.substring(clause.indexOf("WHERE"), clause.indexOf("ORDER")));
250        int offset = getTotal.indexOf("bundleId");
251        List<String> bundleIds = new ArrayList<String>();
252        for (BundleJobBean bundle : bundles) {
253            bundleIds.add(bundle.getId());
254        }
255        // Query: select COUNT(a) from CoordinatorActionBean a, CoordinatorJobBean c WHERE ...
256        // AND c.bundleId IN (... list of bundle ids) i.e. replace single :bundleId with list
257        getTotal = getTotal.replace(offset - 6, offset + 20, inClause(bundleIds.size(), "bundleId", 'c', "count").toString());
258        q = em.createQuery(getTotal.toString());
259
260        fillParameters(q, "count", bundleIds);
261
262        if (statuses != null) {
263            fillParameters(q, "status", statuses);
264        }
265
266        if (params != null) {
267            fillParameters(q, "param",  params);
268        }
269
270        Iterator<Entry<String, Timestamp>> iter = times.entrySet().iterator();
271        while (iter.hasNext()) {
272            Entry<String, Timestamp> time = iter.next();
273            q.setParameter(time.getKey(), time.getValue());
274        }
275        long total = ((Long) q.getSingleResult()).longValue();
276        return total;
277    }
278
279    // Form the where clause to filter by coordinator appname/id
280    private StringBuilder inClause(int noOfValues, String col, char type, String paramPrefix) {
281        StringBuilder sb = new StringBuilder();
282        boolean firstVal = true;
283
284        for (int i = 0; i < noOfValues; i++) {
285            if (firstVal) {
286                sb.append(" AND " + type + "." + col + " IN (:" + paramPrefix + i);
287                firstVal = false;
288            }
289            else {
290                sb.append(", :" + paramPrefix + i);
291            }
292        }
293        if (!firstVal) {
294            sb.append(") ");
295        }
296
297        return sb;
298    }
299
300    // Form the where clause to filter by coord action status
301    private StringBuilder statusClause(List<String> statuses) {
302        StringBuilder sb = new StringBuilder();
303
304        if (statuses != null) {
305            sb = inClause(statuses.size(), "statusStr", 'a', "status");
306        }
307
308        if (sb.length() == 0) { // statuses was null. adding default
309            sb.append(" AND a.statusStr IN ('KILLED', 'FAILED') ");
310        }
311
312        return sb;
313    }
314
315    private void timesClause(StringBuilder sb, int offset, Map<String, Timestamp> eachTime) throws ParseException {
316        Timestamp ts = null;
317        List<String> times = bulkFilter.get(BulkResponseImpl.BULK_FILTER_START_CREATED_EPOCH);
318        if (times != null) {
319            ts = new Timestamp(DateUtils.parseDateUTC(times.get(0)).getTime());
320            sb.insert(offset - 1, " AND a.createdTimestamp >= :startCreated");
321            eachTime.put("startCreated", ts);
322        }
323        times = bulkFilter.get(BulkResponseImpl.BULK_FILTER_END_CREATED_EPOCH);
324        if (times != null) {
325            ts = new Timestamp(DateUtils.parseDateUTC(times.get(0)).getTime());
326            sb.insert(offset - 1, " AND a.createdTimestamp <= :endCreated");
327            eachTime.put("endCreated", ts);
328        }
329        times = bulkFilter.get(BulkResponseImpl.BULK_FILTER_START_NOMINAL_EPOCH);
330        if (times != null) {
331            ts = new Timestamp(DateUtils.parseDateUTC(times.get(0)).getTime());
332            sb.insert(offset - 1, " AND a.nominalTimestamp >= :startNominal");
333            eachTime.put("startNominal", ts);
334        }
335        times = bulkFilter.get(BulkResponseImpl.BULK_FILTER_END_NOMINAL_EPOCH);
336        if (times != null) {
337            ts = new Timestamp(DateUtils.parseDateUTC(times.get(0)).getTime());
338            sb.insert(offset - 1, " AND a.nominalTimestamp <= :endNominal");
339            eachTime.put("endNominal", ts);
340        }
341    }
342
343    private BulkResponseImpl getResponseFromObject(BundleJobBean bundleBean, Object arr[]) {
344        BulkResponseImpl bean = new BulkResponseImpl();
345        CoordinatorJobBean coordBean = new CoordinatorJobBean();
346        CoordinatorActionBean actionBean = new CoordinatorActionBean();
347        if (arr[0] != null) {
348            actionBean.setId((String) arr[0]);
349        }
350        if (arr[1] != null) {
351            actionBean.setActionNumber((Integer) arr[1]);
352        }
353        if (arr[2] != null) {
354            actionBean.setErrorCode((String) arr[2]);
355        }
356        if (arr[3] != null) {
357            actionBean.setErrorMessage((String) arr[3]);
358        }
359        if (arr[4] != null) {
360            actionBean.setExternalId((String) arr[4]);
361        }
362        if (arr[5] != null) {
363            actionBean.setExternalStatus((String) arr[5]);
364        }
365        if (arr[6] != null) {
366            actionBean.setStatus(CoordinatorAction.Status.valueOf((String) arr[6]));
367        }
368        if (arr[7] != null) {
369            actionBean.setCreatedTime(DateUtils.toDate((Timestamp) arr[7]));
370        }
371        if (arr[8] != null) {
372            actionBean.setNominalTime(DateUtils.toDate((Timestamp) arr[8]));
373        }
374        if (arr[9] != null) {
375            actionBean.setMissingDependenciesBlob((StringBlob) arr[9]);
376        }
377        if (arr[10] != null) {
378            coordBean.setId((String) arr[10]);
379            actionBean.setJobId((String) arr[10]);
380        }
381        if (arr[11] != null) {
382            coordBean.setAppName((String) arr[11]);
383        }
384        if (arr[12] != null) {
385            coordBean.setStatus(CoordinatorJob.Status.valueOf((String) arr[12]));
386        }
387        bean.setBundle(bundleBean);
388        bean.setCoordinator(coordBean);
389        bean.setAction(actionBean);
390        return bean;
391    }
392
393    private BundleJobBean constructBundleBean(Object[] barr) throws JPAExecutorException {
394        BundleJobBean bean = new BundleJobBean();
395        if (barr[0] != null) {
396            bean.setId((String) barr[0]);
397        }
398        else {
399            throw new JPAExecutorException(ErrorCode.E0603,
400                    "bundleId returned by query is null - cannot retrieve bulk results");
401        }
402        if (barr[1] != null) {
403            bean.setAppName((String) barr[1]);
404        }
405        if (barr[2] != null) {
406            bean.setStatus(BundleJob.Status.valueOf((String) barr[2]));
407        }
408        if (barr[3] != null) {
409            bean.setUser((String) barr[3]);
410        }
411        return bean;
412    }
413
414    private void fillParameters(Query query, String prefix, List<?> values) {
415        for (int i = 0; i < values.size(); i++) {
416            query.setParameter(prefix + i, values.get(i));
417        }
418    }
419
420    // null safeguard
421    public static List<String> nullToEmpty(List<String> input) {
422        return input == null ? Collections.<String> emptyList() : input;
423    }
424
425}