001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.service;
019
020import java.util.ArrayList;
021import java.util.HashMap;
022import java.util.List;
023import java.util.Map;
024import java.util.regex.Matcher;
025import java.util.regex.Pattern;
026
027import org.apache.curator.framework.recipes.leader.LeaderLatch;
028import org.apache.curator.framework.state.ConnectionState;
029import org.apache.curator.x.discovery.ServiceInstance;
030import org.apache.oozie.ErrorCode;
031import org.apache.oozie.client.rest.RestConstants;
032import org.apache.oozie.event.listener.ZKConnectionListener;
033import org.apache.oozie.util.IOUtils;
034import org.apache.oozie.util.Instrumentable;
035import org.apache.oozie.util.Instrumentation;
036import org.apache.oozie.util.ZKUtils;
037
038/**
039 * This Service helps coordinate other Services to prevent duplicate processing of Jobs if there are multiple Oozie Servers.  This
040 * implementation uses ZooKeeper and is designed to correctly deal with multiple Oozie Servers.
041 * <p>
042 * The distributed locks provided by {@link ZKLocksService} will prevent any concurrency issues from occurring if multiple Oozie
043 * Servers try to process the same job at the same time.  However, this will make Oozie slower (more waiting on locks) and will
044 * place additional stress on ZooKeeper and the Database.  By "assigning" different Oozie servers to process different jobs, we can
045 * improve this situation.  This is particularly necessary for Services like the {@link RecoveryService}, which could duplicate jobs
046 * otherwise.  We can assign jobs to servers by doing a mod of the jobs' id and the number of servers.
047 * <p>
048 * The leader server is elected by all of the Oozie servers, so there can only be one at a time.  This is useful for tasks that
049 * require (or are better off) being done by only one server (e.g. database purging).  Note that the leader server isn't a
050 * "traditional leader" in the sense that it doesn't command or have authority over the other servers.  This leader election uses
051 * a znode under /oozie.zookeeper.namespace/ZK_BASE_SERVICES_PATH/ZK_LEADER_PATH (default is /oozie/services/concurrencyleader).
052 */
053public class ZKJobsConcurrencyService extends JobsConcurrencyService implements Service, Instrumentable {
054
055    private ZKUtils zk;
056
057    // This pattern gives us the id number without the extra stuff
058    private static final Pattern ID_PATTERN = Pattern.compile("(\\d{7})-.*");
059
060    private static final String ZK_LEADER_PATH = "concurrencyleader";
061    private static LeaderLatch leaderLatch = null;
062
063    /**
064     * Initialize the zookeeper jobs concurrency service
065     *
066     * @param services services instance.
067     */
068    @Override
069    public void init(Services services) throws ServiceException {
070        super.init(services);
071        try {
072            zk = ZKUtils.register(this);
073            leaderLatch = new LeaderLatch(zk.getClient(), ZKUtils.ZK_BASE_SERVICES_PATH + "/" + ZK_LEADER_PATH, zk.getZKId());
074            leaderLatch.start();
075        }
076        catch (Exception ex) {
077            throw new ServiceException(ErrorCode.E1700, ex.getMessage(), ex);
078        }
079    }
080
081    /**
082     * Destroy the zookeeper jobs concurrency service.
083     */
084    @Override
085    public void destroy() {
086        if (leaderLatch != null && ZKConnectionListener.getZKConnectionState() != ConnectionState.LOST) {
087            IOUtils.closeSafely(leaderLatch);
088        }
089        if (zk != null) {
090            zk.unregister(this);
091        }
092        zk = null;
093        super.destroy();
094    }
095
096    /**
097     * Instruments the zk jobs concurrency service.
098     *
099     * @param instr instance to instrument the zookeeper jobs concurrency service to.
100     */
101    @Override
102    public void instrument(Instrumentation instr) {
103        super.instrument(instr);
104    }
105
106    /**
107     * Check to see if this server is the leader server.  This implementation only returns true if this server has been elected by
108     * all of the servers as the leader server.
109     *
110     * @return true if this server is the leader; false if not
111     */
112    @Override
113    public boolean isLeader() {
114        return leaderLatch.hasLeadership();
115    }
116
117    /**
118     * Check to see if jobId should be processed by this server.  This implementation only returns true if the index of this server
119     * in ZooKeeper's list of servers is equal to the id of the job mod the number of servers.
120     *
121     * @param jobId The jobId to check
122     * @return true if this server should process this jobId; false if not
123     */
124    @Override
125    public boolean isJobIdForThisServer(String jobId) {
126        List<ServiceInstance<Map>> oozies = zk.getAllMetaData();
127        int numOozies = oozies.size();
128        int myIndex = zk.getZKIdIndex(oozies);
129        return checkJobIdForServer(jobId, numOozies, myIndex);
130    }
131
132    /**
133     * Filter out any job ids that should not be processed by this server.  This implementation only preserves jobs such that the
134     * index of this server in ZooKeeper's list of servers is equal to the id of the job mod the number of servers.
135     *
136     * @param ids The list of job ids to check
137     * @return a filtered list of job ids that this server should process
138     */
139    @Override
140    public List<String> getJobIdsForThisServer(List<String> ids) {
141        List<String> filteredIds = new ArrayList<String>();
142        List<ServiceInstance<Map>> oozies = zk.getAllMetaData();
143        int numOozies = oozies.size();
144        int myIndex = zk.getZKIdIndex(oozies);
145        for(String id : ids) {
146            if (checkJobIdForServer(id, numOozies, myIndex)) {
147                filteredIds.add(id);
148            }
149        }
150        return filteredIds;
151    }
152
153    /**
154     * Check if the jobId should be processed by the server with index myIndex when there are numOozies servers.
155     *
156     * @param jobId The jobId to check
157     * @param numOozies The number of Oozie servers
158     * @param myIndex The index of the Oozie server
159     * @return true if the jobId should be processed by the server, false if not
160     */
161    private boolean checkJobIdForServer(String jobId, int numOozies, int myIndex) {
162        boolean belongs = true;
163        Matcher m = ID_PATTERN.matcher(jobId);
164        if (m.matches() && m.groupCount() == 1) {
165            String idNumStr = m.group(1);
166            int idNum = Integer.parseInt(idNumStr);
167            belongs = (idNum % numOozies == myIndex);
168        }
169        return belongs;
170    }
171
172    /**
173     * Return a map of instance id to Oozie server URL.  This implementation always returns a map with where the key is the instance
174     * id and the value is the URL of each Oozie server that we can see in the service discovery in ZooKeeper.
175     *
176     * @return A map of Oozie instance ids and URLs
177     */
178    @Override
179    public Map<String, String> getServerUrls() {
180        Map<String, String> urls = new HashMap<String, String>();
181        List<ServiceInstance<Map>> oozies = zk.getAllMetaData();
182        for (ServiceInstance<Map> oozie : oozies) {
183            Map<String, String> metadata = oozie.getPayload();
184            String id = metadata.get(ZKUtils.ZKMetadataKeys.OOZIE_ID);
185            String url = metadata.get(ZKUtils.ZKMetadataKeys.OOZIE_URL);
186            urls.put(id, url);
187        }
188        return urls;
189    }
190
191    /**
192     * Return a map of instance id to Oozie server URL of other servers.  This implementation always returns a map with
193     * where the key is the instance id and the value is the URL of each Oozie server that we can see in the service
194     * discovery in ZooKeeper.
195     *
196     * @return A map of Oozie instance ids and URLs
197     */
198    @Override
199    public Map<String, String> getOtherServerUrls() {
200        Map<String, String> urls = new HashMap<String, String>();
201        List<ServiceInstance<Map>> oozies = zk.getAllMetaData();
202        for (ServiceInstance<Map> oozie : oozies) {
203            Map<String, String> metadata = oozie.getPayload();
204            String id = metadata.get(ZKUtils.ZKMetadataKeys.OOZIE_ID);
205
206            if (id.equals(zk.getZKId())) {
207                continue;
208            }
209            String url = metadata.get(ZKUtils.ZKMetadataKeys.OOZIE_URL);
210            urls.put(id, url);
211        }
212        return urls;
213    }
214
215    /**
216     * Checks if rest request is for all server. By default it's true.
217     *
218     * @param params the HttpRequest param
219     * @return false if allservers=false, else true;
220     */
221    @Override
222    public boolean isAllServerRequest(Map<String, String[]> params) {
223        return params == null || params.get(RestConstants.ALL_SERVER_REQUEST) == null || params.isEmpty()
224                || !params.get(RestConstants.ALL_SERVER_REQUEST)[0].equalsIgnoreCase("false");
225    }
226
227    /**
228     * Return if it is running in HA mode
229     *
230     * @return
231     */
232    @Override
233    public boolean isHighlyAvailableMode() {
234        return true;
235    }
236}