Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1""" 

2pyLibravatar - Python module for Libravatar. 

3 

4Easy way to make use of the federated Libravatar.org avatar hosting service 

5from within your Python applications. 

6 

7Copyright (C) 2011 - 2017 Francois Marier <francois@libravatar.org> 

8Copyright (C) 2018 - Oliver Falk <oliver@linux-kernel.at> 

9 

10Permission is hereby granted, free of charge, to any person obtaining a copy 

11of this software and associated documentation files (the "Software"), to 

12deal in the Software without restriction, including without limitation the 

13rights to use, copy, modify, merge, publish, distribute, sublicense, and/or 

14sell copies of the Software, and to permit persons to whom the Software is 

15furnished to do so, subject to the following conditions: 

16 

17The above copyright notice and this permission notice shall be included in 

18all copies or substantial portions of the Software. 

19 

20THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 

21IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 

22FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 

23AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 

24LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING 

25FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS 

26IN THE SOFTWARE. 

27""" 

28 

29from __future__ import print_function 

30 

31import hashlib 

32import random 

33import re 

34import DNS 

35 

36try: 

37 # Python-3.x 

38 # pylint: disable=F0401,E0611 

39 from urllib.parse import urlsplit, urlunsplit 

40 from urllib.parse import quote_plus 

41except ImportError: 

42 # Python-2.x 

43 from urlparse import urlsplit, urlunsplit 

44 from urllib import quote_plus 

45 

46 

47BASE_URL = 'http://cdn.libravatar.org/avatar/' 

48SECURE_BASE_URL = 'https://seccdn.libravatar.org/avatar/' 

49SERVICE_BASE = '_avatars._tcp' 

50SECURE_SERVICE_BASE = '_avatars-sec._tcp' 

51MIN_AVATAR_SIZE = 1 

52MAX_AVATAR_SIZE = 512 

53 

54 

55def libravatar_url(email=None, openid=None, https=False, 

56 default=None, size=None): 

57 """Return a URL to the appropriate avatar.""" 

58 avatar_hash, domain = parse_user_identity(email, openid) 

59 query_string = parse_options(default, size) 

60 

61 delegation_server = lookup_avatar_server(domain, https) 

62 return compose_avatar_url(delegation_server, avatar_hash, 

63 query_string, https) 

64 

65 

66def parse_options(default, size): 

67 """Turn optional parameters into a query string.""" 

68 query_string = '' 

69 if default: 

70 query_string = '?d=%s' % quote_plus(str(default)) 

71 if size: 

72 try: 

73 size = int(size) 

74 except ValueError: 

75 return query_string # invalid size, skip 

76 

77 if len(query_string) > 0: 

78 query_string += '&' 

79 else: 

80 query_string = '?' 

81 query_string += 's=%s' % max(MIN_AVATAR_SIZE, 

82 min(MAX_AVATAR_SIZE, size)) 

83 

84 return query_string 

85 

86 

87def parse_user_identity(email, openid): 

88 """ 

89 Generate user hash based on the email address or OpenID. 

90 

91 The hash will be returned along with the relevant domain. 

92 """ 

93 hash_obj = None 

94 if email: 

95 lowercase_value = email.strip().lower() 

96 domain = lowercase_value.split('@')[-1] 

97 hash_obj = hashlib.new('md5') 

98 elif openid: 

99 # pylint: disable=E1103 

100 url = urlsplit(openid.strip()) 

101 if url.username: 

102 password = url.password or '' 

103 netloc = url.username + ':' + password + '@' + url.hostname 

104 else: 

105 netloc = url.hostname 

106 lowercase_value = urlunsplit((url.scheme.lower(), netloc, 

107 url.path, url.query, url.fragment)) 

108 domain = url.hostname 

109 hash_obj = hashlib.new('sha256') 

110 

111 if not hash_obj: # email and openid both missing 

112 return (None, None) 

113 

114 hash_obj.update(lowercase_value.encode('utf-8')) 

115 return (hash_obj.hexdigest(), domain) 

116 

117 

118def compose_avatar_url(delegation_server, avatar_hash, query_string, https): 

119 """Assemble the final avatar URL based on the provided components.""" 

120 avatar_hash = avatar_hash or '' 

121 query_string = query_string or '' 

122 

123 base_url = BASE_URL 

124 if https: 

125 base_url = SECURE_BASE_URL 

126 

127 if delegation_server: 

128 if https: 

129 base_url = "https://%s/avatar/" % delegation_server 

130 else: 

131 base_url = "http://%s/avatar/" % delegation_server 

132 

133 return base_url + avatar_hash + query_string 

134 

135 

136def service_name(domain, https): 

137 """Return the DNS service to query for a given domain and scheme.""" 

138 if not domain: 

139 return None 

140 

141 if https: 

142 return "%s.%s" % (SECURE_SERVICE_BASE, domain) 

143 

144 return "%s.%s" % (SERVICE_BASE, domain) 

145 

146 

147def lookup_avatar_server(domain, https): 

148 """ 

149 Extract the avatar server from an SRV record in the DNS zone. 

150 

151 The SRV records should look like this: 

152 

153 _avatars._tcp.example.com. IN SRV 0 0 80 avatars.example.com 

154 _avatars-sec._tcp.example.com. IN SRV 0 0 443 avatars.example.com 

155 """ 

156 DNS.DiscoverNameServers() 

157 try: 

158 dns_request = DNS.Request(name=service_name(domain, https), 

159 qtype='SRV', protocol='tcp').req() 

160 except DNS.DNSError as message: 

161 print("DNS Error: %s" % message) 

162 return None 

163 

164 if dns_request is None: 

165 # No servers discovered? 

166 return None 

167 

168 if dns_request.header['status'] == 'NXDOMAIN': 

169 # Not an error, but no point in going any further 

170 return None 

171 

172 if dns_request.header['status'] != 'NOERROR': 

173 print("DNS Error: status=%s" % dns_request.header['status']) 

174 return None 

175 

176 records = [] 

177 for answer in dns_request.answers: 

178 if ('data' not in answer) or (not answer['data']): 

179 continue 

180 if (not answer['typename']) or (answer['typename'] != 'SRV'): 

181 continue 

182 

183 srv_record = {'priority': int(answer['data'][0]), 

184 'weight': int(answer['data'][1]), 

185 'port': int(answer['data'][2]), 

186 'target': answer['data'][3]} 

187 

188 records.append(srv_record) 

189 

190 return normalized_target(records, https) 

191 

192 

193def normalized_target(records, https): 

194 """ 

195 Pick the right server to use and return its normalized hostname. 

196 

197 The hostname will be returned but the port number will be omitted 

198 unless it's non-standard. 

199 """ 

200 target, port = sanitize_target(srv_hostname(records)) 

201 

202 if target and ((https and port != 443) or (not https and port != 80)): 

203 return "%s:%s" % (target, port) 

204 

205 return target 

206 

207 

208def sanitize_target(args): 

209 """Ensure we are getting a valid hostname and port from DNS resolver.""" 

210 target, port = args 

211 

212 if not target or not port: 

213 return (None, None) 

214 

215 if not re.match('^[0-9a-zA-Z.-]+$', str(target)): 

216 return (None, None) 

217 

218 try: 

219 if int(port) < 1 or int(port) > 65535: 

220 return (None, None) 

221 except ValueError: 

222 return (None, None) 

223 

224 return (target, port) 

225 

226 

227def srv_hostname(records): 

228 """Return the right (target, port) pair from a list of SRV records.""" 

229 if len(records) < 1: 

230 return (None, None) 

231 

232 if len(records) == 1: 

233 srv_record = records[0] 

234 return (srv_record['target'], srv_record['port']) 

235 

236 # Keep only the servers in the top priority 

237 priority_records = [] 

238 total_weight = 0 

239 top_priority = records[0]['priority'] # highest priority = lowest number 

240 

241 for srv_record in records: 

242 if srv_record['priority'] > top_priority: 

243 # ignore the record (srv_record has lower priority) 

244 continue 

245 

246 if srv_record['priority'] < top_priority: 

247 # reset the array (srv_record has higher priority) 

248 top_priority = srv_record['priority'] 

249 total_weight = 0 

250 priority_records = [] 

251 

252 total_weight += srv_record['weight'] 

253 

254 if srv_record['weight'] > 0: 

255 priority_records.append((total_weight, srv_record)) 

256 else: 

257 # zero-weigth elements must come first 

258 priority_records.insert(0, (0, srv_record)) 

259 

260 if len(priority_records) == 1: 

261 srv_record = priority_records[0][1] 

262 return (srv_record['target'], srv_record['port']) 

263 

264 # Select first record according to RFC2782 weight 

265 # ordering algorithm (page 3) 

266 random_number = random.randint(0, total_weight) 

267 

268 for record in priority_records: 

269 weighted_index, srv_record = record 

270 

271 if weighted_index >= random_number: 

272 return (srv_record['target'], srv_record['port']) 

273 

274 print('There is something wrong with our SRV weight ordering algorithm') 

275 return (None, None)