Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def setUp(self):
self.fs = luigi.contrib.hdfs.get_autoconfig_client()
cfg_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "testconfig")
hadoop_bin = os.path.join(os.environ['HADOOP_HOME'], 'bin/hadoop')
cmd = "{} --config {}".format(hadoop_bin, cfg_path)
self.stashed_hdfs_client = luigi.configuration.get_config().get('hadoop', 'command', None)
luigi.configuration.get_config().set('hadoop', 'command', cmd)
def setUp(self):
self.fs = luigi.contrib.hdfs.get_autoconfig_client()
cfg_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "testconfig")
hadoop_bin = os.path.join(os.environ['HADOOP_HOME'], 'bin/hadoop')
cmd = "{} --config {}".format(hadoop_bin, cfg_path)
self.stashed_hdfs_client = luigi.configuration.get_config().get('hadoop', 'command', None)
luigi.configuration.get_config().set('hadoop', 'command', cmd)
desc = ("The database query mechanism used for processing "
"NBAR to Fractional Cover.")
parser = argparse.ArgumentParser(description=desc)
parser.add_argument('--cfg',
help='Path to a user defined configuration file.')
parsed_args = parser.parse_args()
cfg = parsed_args.cfg
# Setup the config file
global CONFIG
if cfg is None:
CONFIG = luigi.configuration.get_config()
CONFIG.add_config_path(pjoin(dirname(__file__), 'fc.cfg'))
else:
CONFIG = luigi.configuration.get_config()
CONFIG.add_config_path(cfg)
out_dir = CONFIG.get('agdc', 'output_directory')
query(out_dir, cfg)
full list of records retrieved.
:param query: the SOQL query to send to Salesforce, e.g.
`SELECT Id FROM Lead WHERE Email = "waldo@somewhere.com"`
"""
# Make the initial query to Salesforce
response = self.query(query, **kwargs)
# get fields
fields = get_soql_fields(query)
# put fields and first page of results into a temp list to be written to TempFile
tmp_list = [fields]
tmp_list.extend(parse_results(fields, response))
tmp_dir = luigi.configuration.get_config().get('salesforce', 'local-tmp-dir', None)
tmp_file = tempfile.TemporaryFile(mode='a+b', dir=tmp_dir)
writer = csv.writer(tmp_file)
writer.writerows(tmp_list)
# The number of results might have exceeded the Salesforce batch limit
# so check whether there are more results and retrieve them if so.
length = len(response['records'])
while not response['done']:
response = self.query_more(response['nextRecordsUrl'], identifier_is_url=True, **kwargs)
writer.writerows(parse_results(fields, response))
length += len(response['records'])
if not length % 10000:
logger.info('Requested {0} lines...'.format(length))
def driver_class_path(self):
return configuration.get_config().get(self.spark_version, "driver-class-path", None)
__all__ = [
'task', 'Task', 'Config', 'ExternalTask', 'WrapperTask', 'namespace', 'auto_namespace',
'target', 'Target', 'LocalTarget', 'rpc', 'RemoteScheduler',
'RPCError', 'parameter', 'Parameter', 'DateParameter', 'MonthParameter',
'YearParameter', 'DateHourParameter', 'DateMinuteParameter', 'DateSecondParameter',
'DateIntervalParameter', 'TimeDeltaParameter', 'IntParameter',
'FloatParameter', 'BoolParameter', 'TaskParameter',
'ListParameter', 'TupleParameter', 'EnumParameter', 'DictParameter',
'configuration', 'interface', 'local_target', 'run', 'build', 'event', 'Event',
'NumericalParameter', 'ChoiceParameter', 'OptionalParameter', 'LuigiStatusCode',
'__version__',
]
if not configuration.get_config().has_option('core', 'autoload_range'):
import warnings
warning_message = '''
Autoloading range tasks by default has been deprecated and will be removed in a future version.
To get the behavior now add an option to luigi.cfg:
[core]
autoload_range: false
Alternately set the option to true to continue with existing behaviour and suppress this warning.
'''
warnings.warn(warning_message, DeprecationWarning)
if configuration.get_config().getboolean('core', 'autoload_range', True):
from .tools import range # noqa: F401 just makes the tool classes available from command line
__all__.append('range')
def executor_cores(self):
return configuration.get_config().get(self.spark_version, "executor-cores", None)
def _read_configuration_file(self, disdat_config_file, luigi_config_file):
"""
Check for environment varialbe 'DISDAT_CONFIG_PATH' -- should point to disdat.cfg
Paths in the config might be relative. If so, add the prefix to them.
Next, see if there is a disdat.cfg in cwd. Then configure disdat and (re)configure logging.
"""
# _logger.debug("Loading config file [{}]".format(disdat_config_file))
config = configparser.ConfigParser({'meta_dir_root': self.meta_dir_root, 'ignore_code_version': 'False'})
config.read(disdat_config_file)
self.meta_dir_root = os.path.expanduser(config.get('core', 'meta_dir_root'))
self.meta_dir_root = DisdatConfig._fix_relative_path(disdat_config_file, self.meta_dir_root)
self.ignore_code_version = config.getboolean('core', 'ignore_code_version')
# Set up luigi configuration
luigi.configuration.get_config().read(luigi_config_file)
# Tell everything to push warnings through the logging infrastructure
logging.captureWarnings(True)
# unfortunately that's not enough -- kill all luigi (and disdat) warnings
import warnings
warnings.filterwarnings("ignore")
meta_dir = os.path.join(self.meta_dir_root, META_DIR)
if not os.path.exists(meta_dir):
os.makedirs(meta_dir)
return config
try:
from elasticsearch.helpers import bulk_index
import elasticsearch
if elasticsearch.__version__ < (1, 0, 0):
logger.warning("This module works with elasticsearch 1.0.0 "
"or newer only.")
except ImportError:
logger.warning("Loading esindex module without elasticsearch installed. "
"Will crash at runtime if esindex functionality is used.")
class ElasticsearchTarget(luigi.Target):
""" Target for a resource in Elasticsearch. """
marker_index = luigi.configuration.get_config().get('elasticsearch',
'marker-index', 'update_log')
marker_doc_type = luigi.configuration.get_config().get('elasticsearch',
'marker-doc-type', 'entry')
def __init__(self, host, port, index, doc_type, update_id,
marker_index_hist_size=0):
"""
Args:
host (str): Elasticsearch server host
port (int): Elasticsearch server port
index (str): Index name
doc_type (str): Doctype name
update_id (str): An identifier for this data set
marker_index_hist_size (int): List of changes to the index to remember
"""
self.host = host