001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.util; 020 021import java.io.ByteArrayInputStream; 022import java.io.ByteArrayOutputStream; 023import java.io.IOException; 024 025import org.codehaus.jackson.JsonNode; 026import org.codehaus.jackson.JsonParseException; 027import org.codehaus.jackson.map.DeserializationConfig; 028import org.codehaus.jackson.map.JsonMappingException; 029import org.codehaus.jackson.map.ObjectMapper; 030 031import com.google.common.base.Preconditions; 032import org.apache.curator.x.discovery.ServiceInstance; 033import org.apache.curator.x.discovery.ServiceInstanceBuilder; 034import org.apache.curator.x.discovery.ServiceType; 035import org.apache.curator.x.discovery.UriSpec; 036import org.apache.curator.x.discovery.details.InstanceSerializer; 037 038// TODO: Workaround for CURATOR-5 (https://issues.apache.org/jira/browse/CURATOR-5) 039// Remove this class (code from pull request listed on JIRA) and use regular JsonInstanceSerializer once fixed 040// (Otherwise we can't properly serialize objects for the ZK Service Discovery) 041public class FixedJsonInstanceSerializer<T> implements InstanceSerializer<T> 042{ 043 044 private final ObjectMapper mMapper; 045 private final Class<T> mPayloadClass; 046 047 /** 048 * @param payloadClass 049 * used to validate payloads when deserializing 050 */ 051 public FixedJsonInstanceSerializer(final Class<T> payloadClass) { 052 this(payloadClass, new ObjectMapper()); 053 } 054 055 public FixedJsonInstanceSerializer(final Class<T> pPayloadClass, final ObjectMapper pMapper) { 056 mPayloadClass = pPayloadClass; 057 mMapper = pMapper; 058 mMapper.configure(DeserializationConfig.Feature.FAIL_ON_UNKNOWN_PROPERTIES, false); 059 } 060 061 @Override 062 public byte[] serialize(final ServiceInstance<T> pInstance) throws Exception { 063 final ByteArrayOutputStream out = new ByteArrayOutputStream(); 064 mMapper.writeValue(out, pInstance); 065 return out.toByteArray(); 066 067 } 068 069 private String getTextField(final JsonNode pNode, final String pFieldName) { 070 Preconditions.checkNotNull(pNode); 071 Preconditions.checkNotNull(pFieldName); 072 return pNode.get(pFieldName) != null ? pNode.get(pFieldName).getTextValue() : null; 073 } 074 075 private Integer getIntegerField(final JsonNode pNode, final String pFieldName) { 076 Preconditions.checkNotNull(pNode); 077 Preconditions.checkNotNull(pFieldName); 078 return (pNode.get(pFieldName) != null && pNode.get(pFieldName).isNumber()) ? pNode.get(pFieldName) 079 .getIntValue() : null; 080 } 081 082 private Long getLongField(final JsonNode pNode, final String pFieldName) { 083 Preconditions.checkNotNull(pNode); 084 Preconditions.checkNotNull(pFieldName); 085 return (pNode.get(pFieldName) != null && pNode.get(pFieldName).isLong()) ? pNode.get(pFieldName).getLongValue() 086 : null; 087 } 088 089 private <O> O getObject(final JsonNode pNode, final String pFieldName, final Class<O> pObjectClass) 090 throws JsonParseException, JsonMappingException, IOException { 091 Preconditions.checkNotNull(pNode); 092 Preconditions.checkNotNull(pFieldName); 093 Preconditions.checkNotNull(pObjectClass); 094 if (pNode.get(pFieldName) != null && pNode.get(pFieldName).isObject()) { 095 return mMapper.readValue(pNode.get(pFieldName), pObjectClass); 096 } else { 097 return null; 098 } 099 } 100 101 @Override 102 public ServiceInstance<T> deserialize(final byte[] pBytes) throws Exception { 103 final ByteArrayInputStream bais = new ByteArrayInputStream(pBytes); 104 final JsonNode rootNode = mMapper.readTree(bais); 105 final ServiceInstanceBuilder<T> builder = ServiceInstance.builder(); 106 { 107 final String address = getTextField(rootNode, "address"); 108 if (address != null) { 109 builder.address(address); 110 } 111 } 112 { 113 final String id = getTextField(rootNode, "id"); 114 if (id != null) { 115 builder.id(id); 116 } 117 } 118 { 119 final String name = getTextField(rootNode, "name"); 120 if (name != null) { 121 builder.name(name); 122 } 123 } 124 { 125 final Integer port = getIntegerField(rootNode, "port"); 126 if (port != null) { 127 builder.port(port); 128 } 129 } 130 { 131 final Integer sslPort = getIntegerField(rootNode, "sslPort"); 132 if (sslPort != null) { 133 builder.sslPort(sslPort); 134 } 135 } 136 { 137 final Long registrationTimeUTC = getLongField(rootNode, "registrationTimeUTC"); 138 if (registrationTimeUTC != null) { 139 builder.registrationTimeUTC(registrationTimeUTC); 140 } 141 } 142 { 143 final T payload = getObject(rootNode, "payload", mPayloadClass); 144 if (payload != null) { 145 builder.payload(payload); 146 } 147 } 148 { 149 final ServiceType serviceType = getObject(rootNode, "serviceType", ServiceType.class); 150 if (serviceType != null) { 151 builder.serviceType(serviceType); 152 } 153 } 154 { 155 final UriSpec uriSpec = getObject(rootNode, "uriSpec", UriSpec.class); 156 if (uriSpec != null) { 157 builder.uriSpec(uriSpec); 158 } 159 } 160 return builder.build(); 161 } 162 163}