001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.oozie.service; 019 020import java.util.ArrayList; 021import java.util.HashMap; 022import java.util.List; 023import java.util.Map; 024import java.util.regex.Matcher; 025import java.util.regex.Pattern; 026 027import org.apache.curator.framework.recipes.leader.LeaderLatch; 028import org.apache.curator.framework.state.ConnectionState; 029import org.apache.curator.x.discovery.ServiceInstance; 030import org.apache.oozie.ErrorCode; 031import org.apache.oozie.client.rest.RestConstants; 032import org.apache.oozie.event.listener.ZKConnectionListener; 033import org.apache.oozie.util.IOUtils; 034import org.apache.oozie.util.Instrumentable; 035import org.apache.oozie.util.Instrumentation; 036import org.apache.oozie.util.ZKUtils; 037 038/** 039 * This Service helps coordinate other Services to prevent duplicate processing of Jobs if there are multiple Oozie Servers. This 040 * implementation uses ZooKeeper and is designed to correctly deal with multiple Oozie Servers. 041 * <p> 042 * The distributed locks provided by {@link ZKLocksService} will prevent any concurrency issues from occurring if multiple Oozie 043 * Servers try to process the same job at the same time. However, this will make Oozie slower (more waiting on locks) and will 044 * place additional stress on ZooKeeper and the Database. By "assigning" different Oozie servers to process different jobs, we can 045 * improve this situation. This is particularly necessary for Services like the {@link RecoveryService}, which could duplicate jobs 046 * otherwise. We can assign jobs to servers by doing a mod of the jobs' id and the number of servers. 047 * <p> 048 * The leader server is elected by all of the Oozie servers, so there can only be one at a time. This is useful for tasks that 049 * require (or are better off) being done by only one server (e.g. database purging). Note that the leader server isn't a 050 * "traditional leader" in the sense that it doesn't command or have authority over the other servers. This leader election uses 051 * a znode under /oozie.zookeeper.namespace/ZK_BASE_SERVICES_PATH/ZK_LEADER_PATH (default is /oozie/services/concurrencyleader). 052 */ 053public class ZKJobsConcurrencyService extends JobsConcurrencyService implements Service, Instrumentable { 054 055 private ZKUtils zk; 056 057 // This pattern gives us the id number without the extra stuff 058 private static final Pattern ID_PATTERN = Pattern.compile("(\\d{7})-.*"); 059 060 private static final String ZK_LEADER_PATH = "concurrencyleader"; 061 private static LeaderLatch leaderLatch = null; 062 063 /** 064 * Initialize the zookeeper jobs concurrency service 065 * 066 * @param services services instance. 067 */ 068 @Override 069 public void init(Services services) throws ServiceException { 070 super.init(services); 071 try { 072 zk = ZKUtils.register(this); 073 leaderLatch = new LeaderLatch(zk.getClient(), ZKUtils.ZK_BASE_SERVICES_PATH + "/" + ZK_LEADER_PATH, zk.getZKId()); 074 leaderLatch.start(); 075 } 076 catch (Exception ex) { 077 throw new ServiceException(ErrorCode.E1700, ex.getMessage(), ex); 078 } 079 } 080 081 /** 082 * Destroy the zookeeper jobs concurrency service. 083 */ 084 @Override 085 public void destroy() { 086 if (leaderLatch != null) { 087 IOUtils.closeSafely(leaderLatch); 088 } 089 if (zk != null) { 090 zk.unregister(this); 091 } 092 zk = null; 093 super.destroy(); 094 } 095 096 /** 097 * Instruments the zk jobs concurrency service. 098 * 099 * @param instr instance to instrument the zookeeper jobs concurrency service to. 100 */ 101 @Override 102 public void instrument(Instrumentation instr) { 103 super.instrument(instr); 104 } 105 106 /** 107 * Check to see if this server is the leader server. This implementation only returns true if this server has been elected by 108 * all of the servers as the leader server. 109 * 110 * @return true if this server is the leader; false if not 111 */ 112 @Override 113 public boolean isLeader() { 114 return leaderLatch.hasLeadership(); 115 } 116 117 /** 118 * Check to see if jobId should be processed by this server. This implementation only returns true if the index of this server 119 * in ZooKeeper's list of servers is equal to the id of the job mod the number of servers. 120 * 121 * @param jobId The jobId to check 122 * @return true if this server should process this jobId; false if not 123 */ 124 @Override 125 public boolean isJobIdForThisServer(String jobId) { 126 List<ServiceInstance<Map>> oozies = zk.getAllMetaData(); 127 int numOozies = oozies.size(); 128 int myIndex = zk.getZKIdIndex(oozies); 129 return checkJobIdForServer(jobId, numOozies, myIndex); 130 } 131 132 /** 133 * Filter out any job ids that should not be processed by this server. This implementation only preserves jobs such that the 134 * index of this server in ZooKeeper's list of servers is equal to the id of the job mod the number of servers. 135 * 136 * @param ids The list of job ids to check 137 * @return filteredIds a filtered list of job ids that this server should process 138 */ 139 @Override 140 public List<String> getJobIdsForThisServer(List<String> ids) { 141 List<String> filteredIds = new ArrayList<String>(); 142 List<ServiceInstance<Map>> oozies = zk.getAllMetaData(); 143 int numOozies = oozies.size(); 144 int myIndex = zk.getZKIdIndex(oozies); 145 for(String id : ids) { 146 if (checkJobIdForServer(id, numOozies, myIndex)) { 147 filteredIds.add(id); 148 } 149 } 150 return filteredIds; 151 } 152 153 /** 154 * Check if the jobId should be processed by the server with index myIndex when there are numOozies servers. 155 * 156 * @param jobId The jobId to check 157 * @param numOozies The number of Oozie servers 158 * @param myIndex The index of the Oozie server 159 * @return true if the jobId should be processed by the server, false if not 160 */ 161 private boolean checkJobIdForServer(String jobId, int numOozies, int myIndex) { 162 boolean belongs = true; 163 Matcher m = ID_PATTERN.matcher(jobId); 164 if (m.matches() && m.groupCount() == 1) { 165 String idNumStr = m.group(1); 166 int idNum = Integer.parseInt(idNumStr); 167 belongs = (idNum % numOozies == myIndex); 168 } 169 return belongs; 170 } 171 172 /** 173 * Return a map of instance id to Oozie server URL. This implementation always returns a map with where the key is the instance 174 * id and the value is the URL of each Oozie server that we can see in the service discovery in ZooKeeper. 175 * 176 * @return urls A map of Oozie instance ids and URLs 177 */ 178 @Override 179 public Map<String, String> getServerUrls() { 180 Map<String, String> urls = new HashMap<String, String>(); 181 List<ServiceInstance<Map>> oozies = zk.getAllMetaData(); 182 for (ServiceInstance<Map> oozie : oozies) { 183 Map<String, String> metadata = oozie.getPayload(); 184 String id = metadata.get(ZKUtils.ZKMetadataKeys.OOZIE_ID); 185 String url = metadata.get(ZKUtils.ZKMetadataKeys.OOZIE_URL); 186 urls.put(id, url); 187 } 188 return urls; 189 } 190 191 /** 192 * Return a map of instance id to Oozie server URL of other servers. This implementation always returns a map with 193 * where the key is the instance id and the value is the URL of each Oozie server that we can see in the service 194 * discovery in ZooKeeper. 195 * 196 * @return urls A map of Oozie instance ids and URLs 197 */ 198 @Override 199 public Map<String, String> getOtherServerUrls() { 200 Map<String, String> urls = new HashMap<String, String>(); 201 List<ServiceInstance<Map>> oozies = zk.getAllMetaData(); 202 for (ServiceInstance<Map> oozie : oozies) { 203 Map<String, String> metadata = oozie.getPayload(); 204 String id = metadata.get(ZKUtils.ZKMetadataKeys.OOZIE_ID); 205 206 if (id.equals(zk.getZKId())) { 207 continue; 208 } 209 String url = metadata.get(ZKUtils.ZKMetadataKeys.OOZIE_URL); 210 urls.put(id, url); 211 } 212 return urls; 213 } 214 215 /** 216 * Checks if rest request is for all server. By default it's true. 217 * 218 * @param params the HttpRequest param 219 * @return false if allservers=false, else true; 220 */ 221 @Override 222 public boolean isAllServerRequest(Map<String, String[]> params) { 223 return params == null || params.get(RestConstants.ALL_SERVER_REQUEST) == null || params.isEmpty() 224 || !params.get(RestConstants.ALL_SERVER_REQUEST)[0].equalsIgnoreCase("false"); 225 } 226 227 /** 228 * Return if it is running in HA mode 229 * 230 * @return true Return whether or not it is running in HA mode 231 */ 232 @Override 233 public boolean isHighlyAvailableMode() { 234 return true; 235 } 236}