001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.executor.jpa;
019
020import java.sql.Timestamp;
021import java.text.ParseException;
022import java.util.ArrayList;
023import java.util.Collections;
024import java.util.HashMap;
025import java.util.Iterator;
026import java.util.List;
027import java.util.Map;
028import java.util.Map.Entry;
029import java.util.regex.Matcher;
030import java.util.regex.Pattern;
031
032import javax.persistence.EntityManager;
033import javax.persistence.Query;
034
035import org.apache.oozie.BundleJobBean;
036import org.apache.oozie.client.BundleJob;
037import org.apache.oozie.client.CoordinatorJob;
038import org.apache.oozie.client.rest.BulkResponseImpl;
039import org.apache.oozie.BulkResponseInfo;
040import org.apache.oozie.CoordinatorActionBean;
041import org.apache.oozie.CoordinatorJobBean;
042import org.apache.oozie.ErrorCode;
043import org.apache.oozie.StringBlob;
044import org.apache.oozie.client.CoordinatorAction;
045import org.apache.oozie.service.Services;
046import org.apache.oozie.util.DateUtils;
047import org.apache.oozie.util.ParamChecker;
048
049/**
050 * The query executor class for bulk monitoring queries i.e. debugging bundle ->
051 * coord actions directly
052 */
053public class BulkJPAExecutor implements JPAExecutor<BulkResponseInfo> {
054    private Map<String, List<String>> bulkFilter;
055    // defaults
056    private int start = 1;
057    private int len = 50;
058    private enum PARAM_TYPE {
059        ID, NAME
060    }
061
062    public BulkJPAExecutor(Map<String, List<String>> bulkFilter, int start, int len) {
063        ParamChecker.notNull(bulkFilter, "bulkFilter");
064        this.bulkFilter = bulkFilter;
065        this.start = start;
066        this.len = len;
067    }
068
069    @Override
070    public String getName() {
071        return "BulkJPAExecutor";
072    }
073
074    @Override
075    public BulkResponseInfo execute(EntityManager em) throws JPAExecutorException {
076        List<BulkResponseImpl> responseList = new ArrayList<BulkResponseImpl>();
077        Map<String, Timestamp> actionTimes = new HashMap<String, Timestamp>();
078
079        try {
080            // Lightweight Query 1 on Bundle level to fetch the bundle job(s)
081            // corresponding to names or ids
082            List<BundleJobBean> bundleBeans = bundleQuery(em);
083
084            // Join query between coordinator job and coordinator action tables
085            // to get entries for specific bundleId only
086            String conditions = actionQuery(em, bundleBeans, actionTimes, responseList);
087
088            // Query to get the count of records
089            long total = countQuery(conditions, em, bundleBeans, actionTimes);
090
091            BulkResponseInfo bulk = new BulkResponseInfo(responseList, start, len, total);
092            return bulk;
093        }
094        catch (Exception e) {
095            throw new JPAExecutorException(ErrorCode.E0603, e.getMessage(), e);
096        }
097    }
098
099    /**
100     * build the bundle level query to get bundle beans for the specified ids or appnames
101     * @param em
102     * @return List BundleJobBeans
103     * @throws JPAExecutorException
104     */
105    @SuppressWarnings("unchecked")
106    private List<BundleJobBean> bundleQuery(EntityManager em) throws JPAExecutorException {
107        Query q = em.createNamedQuery("BULK_MONITOR_BUNDLE_QUERY");
108        StringBuilder bundleQuery = new StringBuilder(q.toString());
109
110        StringBuilder whereClause = null;
111        List<String> bundles = bulkFilter.get(BulkResponseImpl.BULK_FILTER_BUNDLE);
112        if (bundles != null) {
113            PARAM_TYPE type = getParamType(bundles.get(0), 'B');
114            if (type == PARAM_TYPE.NAME) {
115                whereClause = inClause(bundles, "appName", 'b');
116            }
117            else if (type == PARAM_TYPE.ID) {
118                whereClause = inClause(bundles, "id", 'b');
119            }
120
121            // Query: select <columns> from BundleJobBean b where b.id IN (...) _or_ b.appName IN (...)
122            bundleQuery.append(whereClause.replace(whereClause.indexOf("AND"), whereClause.indexOf("AND") + 3, "WHERE"));
123            List<Object[]> bundleObjs = (List<Object[]>) em.createQuery(bundleQuery.toString()).getResultList();
124            if (bundleObjs.isEmpty()) {
125                throw new JPAExecutorException(ErrorCode.E0603, "No entries found for given bundle(s)");
126            }
127
128            List<BundleJobBean> bundleBeans = new ArrayList<BundleJobBean>();
129            for (Object[] bundleElem : bundleObjs) {
130                bundleBeans.add(constructBundleBean(bundleElem));
131            }
132            return bundleBeans;
133        }
134        return null;
135    }
136
137    /**
138     * Validate and determine whether passed param is job-id or appname
139     * @param id
140     * @param job
141     * @return PARAM_TYPE
142     */
143    private PARAM_TYPE getParamType(String id, char job) {
144        Pattern p = Pattern.compile("\\d{7}-\\d{15}-" + Services.get().getSystemId() + "-" + job);
145        Matcher m = p.matcher(id);
146        if (m.matches()) {
147            return PARAM_TYPE.ID;
148        }
149        return PARAM_TYPE.NAME;
150    }
151
152    /**
153     * Compose the coord action level query comprising bundle id/appname filter and coord action
154     * status filter (if specified) and start-time or nominal-time filter (if specified)
155     * @param em
156     * @param bundles
157     * @param times
158     * @param responseList
159     * @return Query string
160     * @throws ParseException
161     */
162    @SuppressWarnings("unchecked")
163    private String actionQuery(EntityManager em, List<BundleJobBean> bundles,
164            Map<String, Timestamp> times, List<BulkResponseImpl> responseList) throws ParseException {
165        Query q = em.createNamedQuery("BULK_MONITOR_ACTIONS_QUERY");
166        StringBuilder getActions = new StringBuilder(q.toString());
167        int offset = getActions.indexOf("ORDER");
168        StringBuilder conditionClause = new StringBuilder();
169
170        List<String> coords = bulkFilter.get(BulkResponseImpl.BULK_FILTER_COORD);
171        // Query: Select <columns> from CoordinatorActionBean a, CoordinatorJobBean c WHERE a.jobId = c.id
172        // AND c.bundleId = :bundleId AND c.appName/id IN (...)
173        if (coords != null) {
174            PARAM_TYPE type = getParamType(coords.get(0), 'C');
175            if (type == PARAM_TYPE.NAME) {
176                conditionClause.append(inClause(bulkFilter.get(BulkResponseImpl.BULK_FILTER_COORD), "appName", 'c'));
177            }
178            else if (type == PARAM_TYPE.ID) {
179                conditionClause.append(inClause(bulkFilter.get(BulkResponseImpl.BULK_FILTER_COORD), "id", 'c'));
180            }
181        }
182        // Query: Select <columns> from CoordinatorActionBean a, CoordinatorJobBean c WHERE a.jobId = c.id
183        // AND c.bundleId = :bundleId AND c.appName/id IN (...) AND a.statusStr IN (...)
184        conditionClause.append(statusClause(bulkFilter.get(BulkResponseImpl.BULK_FILTER_STATUS)));
185        offset = getActions.indexOf("ORDER");
186        getActions.insert(offset - 1, conditionClause);
187
188        // Query: Select <columns> from CoordinatorActionBean a, CoordinatorJobBean c WHERE a.jobId = c.id
189        // AND c.bundleId = :bundleId AND c.appName/id IN (...) AND a.statusStr IN (...)
190        // AND a.createdTimestamp >= startCreated _or_ a.createdTimestamp <= endCreated
191        // AND a.nominalTimestamp >= startNominal _or_ a.nominalTimestamp <= endNominal
192        timesClause(getActions, offset, times);
193        q = em.createQuery(getActions.toString());
194        Iterator<Entry<String, Timestamp>> iter = times.entrySet().iterator();
195        while (iter.hasNext()) {
196            Entry<String, Timestamp> time = iter.next();
197            q.setParameter(time.getKey(), time.getValue());
198        }
199        // pagination
200        q.setFirstResult(start - 1);
201        q.setMaxResults(len);
202        // repeatedly execute above query for each bundle
203        for (BundleJobBean bundle : bundles) {
204            q.setParameter("bundleId", bundle.getId());
205            List<Object[]> response = q.getResultList();
206            for (Object[] r : response) {
207                BulkResponseImpl br = getResponseFromObject(bundle, r);
208                responseList.add(br);
209            }
210        }
211        return q.toString();
212    }
213
214    /**
215     * Get total number of records for use with offset and len in API
216     * @param clause
217     * @param em
218     * @param bundles
219     * @return total count of coord actions
220     */
221    private long countQuery(String clause, EntityManager em, List<BundleJobBean> bundles, Map<String, Timestamp> times) {
222        Query q = em.createNamedQuery("BULK_MONITOR_COUNT_QUERY");
223        StringBuilder getTotal = new StringBuilder(q.toString() + " ");
224        // Query: select COUNT(a) from CoordinatorActionBean a, CoordinatorJobBean c
225        // get entire WHERE clause from above i.e. actionQuery() for all conditions on coordinator job
226        // and action status and times
227        getTotal.append(clause.substring(clause.indexOf("WHERE"), clause.indexOf("ORDER")));
228        int offset = getTotal.indexOf("bundleId");
229        List<String> bundleIds = new ArrayList<String>();
230        for (BundleJobBean bundle : bundles) {
231            bundleIds.add(bundle.getId());
232        }
233        // Query: select COUNT(a) from CoordinatorActionBean a, CoordinatorJobBean c WHERE ...
234        // AND c.bundleId IN (... list of bundle ids) i.e. replace single :bundleId with list
235        getTotal = getTotal.replace(offset - 6, offset + 20, inClause(bundleIds, "bundleId", 'c').toString());
236        q = em.createQuery(getTotal.toString());
237        Iterator<Entry<String, Timestamp>> iter = times.entrySet().iterator();
238        while (iter.hasNext()) {
239            Entry<String, Timestamp> time = iter.next();
240            q.setParameter(time.getKey(), time.getValue());
241        }
242        long total = ((Long) q.getSingleResult()).longValue();
243        return total;
244    }
245
246    // Form the where clause to filter by coordinator appname/id
247    private StringBuilder inClause(List<String> values, String col, char type) {
248        StringBuilder sb = new StringBuilder();
249        boolean firstVal = true;
250        for (String name : nullToEmpty(values)) {
251            if (firstVal) {
252                sb.append(" AND " + type + "." + col + " IN (\'" + name + "\'");
253                firstVal = false;
254            }
255            else {
256                sb.append(",\'" + name + "\'");
257            }
258        }
259        if (!firstVal) {
260            sb.append(") ");
261        }
262        return sb;
263    }
264
265    // Form the where clause to filter by coord action status
266    private StringBuilder statusClause(List<String> statuses) {
267        StringBuilder sb = inClause(statuses, "statusStr", 'a');
268        if (sb.length() == 0) { // statuses was null. adding default
269            sb.append(" AND a.statusStr IN ('KILLED', 'FAILED') ");
270        }
271        return sb;
272    }
273
274    private void timesClause(StringBuilder sb, int offset, Map<String, Timestamp> eachTime) throws ParseException {
275        Timestamp ts = null;
276        List<String> times = bulkFilter.get(BulkResponseImpl.BULK_FILTER_START_CREATED_EPOCH);
277        if (times != null) {
278            ts = new Timestamp(DateUtils.parseDateUTC(times.get(0)).getTime());
279            sb.insert(offset - 1, " AND a.createdTimestamp >= :startCreated");
280            eachTime.put("startCreated", ts);
281        }
282        times = bulkFilter.get(BulkResponseImpl.BULK_FILTER_END_CREATED_EPOCH);
283        if (times != null) {
284            ts = new Timestamp(DateUtils.parseDateUTC(times.get(0)).getTime());
285            sb.insert(offset - 1, " AND a.createdTimestamp <= :endCreated");
286            eachTime.put("endCreated", ts);
287        }
288        times = bulkFilter.get(BulkResponseImpl.BULK_FILTER_START_NOMINAL_EPOCH);
289        if (times != null) {
290            ts = new Timestamp(DateUtils.parseDateUTC(times.get(0)).getTime());
291            sb.insert(offset - 1, " AND a.nominalTimestamp >= :startNominal");
292            eachTime.put("startNominal", ts);
293        }
294        times = bulkFilter.get(BulkResponseImpl.BULK_FILTER_END_NOMINAL_EPOCH);
295        if (times != null) {
296            ts = new Timestamp(DateUtils.parseDateUTC(times.get(0)).getTime());
297            sb.insert(offset - 1, " AND a.nominalTimestamp <= :endNominal");
298            eachTime.put("endNominal", ts);
299        }
300    }
301
302    private BulkResponseImpl getResponseFromObject(BundleJobBean bundleBean, Object arr[]) {
303        BulkResponseImpl bean = new BulkResponseImpl();
304        CoordinatorJobBean coordBean = new CoordinatorJobBean();
305        CoordinatorActionBean actionBean = new CoordinatorActionBean();
306        if (arr[0] != null) {
307            actionBean.setId((String) arr[0]);
308        }
309        if (arr[1] != null) {
310            actionBean.setActionNumber((Integer) arr[1]);
311        }
312        if (arr[2] != null) {
313            actionBean.setErrorCode((String) arr[2]);
314        }
315        if (arr[3] != null) {
316            actionBean.setErrorMessage((String) arr[3]);
317        }
318        if (arr[4] != null) {
319            actionBean.setExternalId((String) arr[4]);
320        }
321        if (arr[5] != null) {
322            actionBean.setExternalStatus((String) arr[5]);
323        }
324        if (arr[6] != null) {
325            actionBean.setStatus(CoordinatorAction.Status.valueOf((String) arr[6]));
326        }
327        if (arr[7] != null) {
328            actionBean.setCreatedTime(DateUtils.toDate((Timestamp) arr[7]));
329        }
330        if (arr[8] != null) {
331            actionBean.setNominalTime(DateUtils.toDate((Timestamp) arr[8]));
332        }
333        if (arr[9] != null) {
334            actionBean.setMissingDependenciesBlob((StringBlob) arr[9]);
335        }
336        if (arr[10] != null) {
337            coordBean.setId((String) arr[10]);
338            actionBean.setJobId((String) arr[10]);
339        }
340        if (arr[11] != null) {
341            coordBean.setAppName((String) arr[11]);
342        }
343        if (arr[12] != null) {
344            coordBean.setStatus(CoordinatorJob.Status.valueOf((String) arr[12]));
345        }
346        bean.setBundle(bundleBean);
347        bean.setCoordinator(coordBean);
348        bean.setAction(actionBean);
349        return bean;
350    }
351
352    private BundleJobBean constructBundleBean(Object[] barr) throws JPAExecutorException {
353        BundleJobBean bean = new BundleJobBean();
354        if (barr[0] != null) {
355            bean.setId((String) barr[0]);
356        }
357        else {
358            throw new JPAExecutorException(ErrorCode.E0603,
359                    "bundleId returned by query is null - cannot retrieve bulk results");
360        }
361        if (barr[1] != null) {
362            bean.setAppName((String) barr[1]);
363        }
364        if (barr[2] != null) {
365            bean.setStatus(BundleJob.Status.valueOf((String) barr[2]));
366        }
367        if (barr[3] != null) {
368            bean.setUser((String) barr[3]);
369        }
370        return bean;
371    }
372
373    // null safeguard
374    public static List<String> nullToEmpty(List<String> input) {
375        return input == null ? Collections.<String> emptyList() : input;
376    }
377
378}