Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
Parameters
----------
cdr_types : list of str, optional
Subset of CDR types for which to find available dates.
If not provided, the union of available dates for all CDR types will be returned.
Returns
-------
list of pendulum.Date
List of available dates, in chronological order
"""
prefect.context.logger.info(
f"Getting available dates from FlowAPI at '{prefect.config.flowapi_url}'."
)
conn = flowclient.connect(
url=prefect.config.flowapi_url, token=environ["FLOWAPI_TOKEN"]
)
dates = flowclient.get_available_dates(connection=conn)
prefect.context.logger.debug(f"Available dates: {dates}")
if cdr_types is None:
prefect.context.logger.debug(
"No CDR types provided. Will return available dates for all CDR types."
)
cdr_types = dates.keys()
else:
prefect.context.logger.debug(
f"Returning available dates for CDR types {cdr_types}."
)
unknown_cdr_types = set(cdr_types).difference(dates.keys())
if unknown_cdr_types:
warnings.warn(f"No data available for CDR types {unknown_cdr_types}.")