Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def crawldir(path, language=None, foreign_id=None):
"""Crawl the given directory."""
path = Path(path)
if foreign_id is None:
foreign_id = 'directory:%s' % slugify(path)
collection = ensure_collection(foreign_id, path.name)
log.info('Crawling %s to %s (%s)...', path, foreign_id, collection.id)
crawl_directory(collection, path)
log.info('Complete. Make sure a worker is running :)')
update_collection(collection)
res = requests.get(url)
# empty = 'class="nom_fugitif_wanted">Identity unknown' not in res.content
# if empty:
# print "MISSING", url
# return
doc = html.fromstring(res.content)
data = {
'url': url,
'last_updated': datetime.utcnow().isoformat(),
'name': doc.find('.//div[@class="nom_fugitif_wanted"]').text_content(),
'reason': doc.find('.//span[@class="nom_fugitif_wanted_small"]').text_content(),
# 'html': res.content
}
for row in doc.findall('.//div[@class="bloc_detail"]//tr'):
title, value = row.findall('./td')
name = slugify(title.text_content(), sep='_')
if len(name):
data[name] = value.text_content().strip()
print 'Wanted: %s' % data['name'].encode('utf-8')
return data
def to_proxy(self):
proxy = model.get_proxy({
'id': str(self.id),
'schema': self.model,
'properties': {}
})
meta = dict(self.meta)
headers = meta.pop('headers', {})
if is_mapping(headers):
headers = {slugify(k, sep='_'): v for k, v in headers.items()}
else:
headers = {}
proxy.set('contentHash', self.content_hash)
proxy.set('parent', self.parent_id)
proxy.set('ancestors', self.ancestors)
proxy.set('crawler', meta.get('crawler'))
proxy.set('sourceUrl', meta.get('source_url'))
proxy.set('title', meta.get('title'))
proxy.set('fileName', meta.get('file_name'))
if not proxy.has('fileName'):
disposition = headers.get('content_disposition')
if disposition is not None:
_, attrs = cgi.parse_header(disposition)
proxy.set('fileName', attrs.get('filename'))
proxy.set('mimeType', meta.get('mime_type'))
if not proxy.has('mimeType'):
def convrow(data):
row = {}
for name, val in data.items():
name = name.upper()
if name.startswith('DTE') and val is not None:
dt = datetime.fromtimestamp(int(val) / 1000)
val = dt.date().isoformat()
if name.startswith('GUID'):
continue
if name == 'AREA':
val = min(val, (2 ** 31) - 1)
if name == 'ID':
name = 'FC_ID'
if val is None or not len(unicode(val).strip()):
continue
row[slugify(name, sep='_')] = val
return row
def slugify(mapping, bind, values):
""" Transform all values into URL-capable slugs. """
for value in values:
if isinstance(value, six.string_types):
value = transliterate(value)
value = normality.slugify(value)
yield value
def parse_file(path):
with open(path, 'rb') as fh:
ctx = json.load(fh)
if ctx['source_name'] not in ['MZ']:
return
all_name = slugify('%(source_name)s flexicadastre' % ctx, sep='_')
all_tbl = database[all_name]
all_tbl.delete()
layers = ctx.pop('layers')
for layer in layers:
lctx = ctx.copy()
lctx['layer_name'] = layer['name']
lctx['layer_id'] = layer['id']
del lctx['rest_url']
tbl_name = slugify('%(source_name)s %(layer_name)s' % lctx, sep='_')
tbl = database[tbl_name]
tbl.delete()
features = layer['data']['features']
print ' -> Generating:', tbl_name
def column_alias(cell, names):
""" Generate a normalized version of the column name. """
column = slugify(cell.column or '', sep='_')
column = column.strip('_')
column = 'column' if not len(column) else column
name, i = column, 2
# de-dupe: column, column_2, column_3, ...
while name in names:
name = '%s_%s' % (name, i)
i += 1
return name
def load_countries():
if len(COUNTRIES):
return COUNTRIES
with open(os.path.join(DATA_FIXTURES, 'countries.csv'), 'r') as fh:
for row in unicodecsv.DictReader(fh):
name = slugify(row['name'], sep=' ').strip()
code = row['code'].strip().upper()
REQUESTED.append({'name': row['name'], 'code': code})
COUNTRIES[name] = code
return COUNTRIES
def normalize_country(name):
if name is None:
return
normed = slugify(name, sep=' ').strip()
if not len(normed):
return
countries = load_countries()
if normed in countries:
if countries[normed] == 'XX':
return
return countries[normed]
normed = normed.upper()
if normed in countries.values():
return normed
for req in REQUESTED:
if req['name'] == name:
return
REQUESTED.append({'name': name, 'code': None})
save_requested()