Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_date_range(dt, expected):
fieldname = "acquired"
arg = "gte"
assert filters.date_range(fieldname, **{arg: dt}) == expected
def _builder(self):
return filters.range_filter
def get_items_count(combined_filter, item_types):
req = filters.build_search_request(combined_filter, item_types, interval="year") #year or day
stats = client.stats(req).get()
# p(stats)
total_count = 0
for bucket in stats['buckets']:
total_count += bucket['count']
return total_count
def get_a_filter_cli_api(polygon_json,start_date, end_date, could_cover_thr):
geo_filter = filters.geom_filter(polygon_json)
date_filter = filters.date_range('acquired', gte=start_date, lte = end_date)
cloud_filter = filters.range_filter('cloud_cover', lte=could_cover_thr)
combined_filters = filters.and_filter(geo_filter, date_filter, cloud_filter)
return combined_filters
'''
create a filter based on a geometry, date range, cloud cover
:param polygon_json: a polygon in json format
:param start_date: start date
:param end_date: end date
:param could_cover_thr: images with cloud cover less than this value
:return: a combined filter (and filter)
'''
# gt: Greater Than
# gte: Greater Than or Equal To
# lt: Less Than
# lte: Less Than or Equal To
geo_filter = filters.geom_filter(polygon_json)
date_filter = filters.date_range('acquired', gte=start_date, lte = end_date)
cloud_filter = filters.range_filter('cloud_cover', lte=could_cover_thr)
combined_filters = filters.and_filter(geo_filter, date_filter, cloud_filter)
return combined_filters
def convert(self, val, param, ctx):
val = read(val)
if not val:
return []
try:
filt = json.loads(val)
except ValueError:
raise click.BadParameter('invalid JSON')
if not filters.is_filter_like(filt):
self.fail('Does not appear to be valid filter', param, ctx)
return filt
def and_filter_from_opts(opts):
'''build an AND filter from the provided opts dict as passed to a command
from the filter_options decorator. Assumes all dict values are lists of
filter dict constructs.'''
return filters.and_filter(*list(chain.from_iterable([
o for o in opts.values() if o]
)))
def search_req_from_opts(**kw):
# item_type will be list of lists - flatten
item_types = chain.from_iterable(kw.pop('item_type'))
name = kw.pop('name', '')
interval = kw.pop('interval', '')
filt = filter_from_opts(**kw)
return filters.build_search_request(
filt, item_types, name=name, interval=interval)
def convert(self, val, param, ctx):
val = read(val)
if not val:
return []
try:
geoj = json.loads(val)
except ValueError:
raise click.BadParameter('invalid GeoJSON')
geom = geometry_from_json(geoj)
if geom is None:
raise click.BadParameter('unable to find geometry in input')
return [filters.geom_filter(geom)]