Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
_renew = renew(self, kwargs)
_serializer = serializer(self, kwargs)
_metadata = metadata(self, kwargs)
if _cache is None or not _use_cache:
return method(self, **kwargs)
if _cache.has_key(_key, namespace=_namespace) and not _renew: # noqa: has_key is not of a dictionary here
try:
return _cache.get(
_key,
namespace=_namespace,
serializer=_serializer
)
except:
logger.warning("Failed to retrieve results from cache. Renewing the cache...")
if config.cache_fail_hard:
six.reraise(*sys.exc_info())
finally:
logger.caveat('Loaded from cache')
# Renewing/creating cache
value = method(self, **kwargs)
if value is None:
logger.warning("Method value returned None. Not saving to cache.")
return
try:
_cache.set(
_key,
value=value,
namespace=_namespace,
warning_deps[dep] = "{}=={}".format(package_name, pkg_resources.get_distribution(m.group(0)).version)
except:
# Some packages may be available, but not installed. If so, we
# should accept them with warnings (if version specified in dep).
try:
importlib.import_module(package_name)
if not accept_any_version:
warning_deps.append('{}=='.format(package_name))
except: # ImportError in python 2, ModuleNotFoundError in Python 3
missing_deps.append(dep)
if warning_deps:
message = "You may have some outdated packages:\n"
for key in sorted(warning_deps):
message += '\t- Want {}, found {}'.format(key, warning_deps[key])
logger.warning(message)
if missing_deps:
message = message or "Whoops! You do not seem to have all the dependencies required."
fix = ("You can fix this by running:\n\n"
"\t{install_command}\n\n"
"Note: Depending on your system's installation of Python, you may "
"need to use `pip2` or `pip3` instead of `pip`.").format(install_command='pip install --upgrade ' + ' '.join(missing_deps))
raise RuntimeError('\n\n'.join([message, fix]))
def _connect(self):
from pydruid.db import connect
logger.info('Connecting to Druid database ...')
self.__druid = connect(self.host, self.port, path='/druid/v2/sql/', scheme='http')
if self.username or self.password:
logger.warning(
'Duct username and password not passed to pydruid connection. '
'pydruid connection currently does not allow these fields to be passed.'
_cache.set(
_key,
value=value,
namespace=_namespace,
serializer=_serializer,
metadata=_metadata
)
# Return from cache every time, just in case serialization operation was
# destructive (e.g. reading from cursors)
return _cache.get(
_key,
namespace=_namespace,
serializer=_serializer
)
except:
logger.warning("Failed to save results to cache. If needed, please save them manually.")
if config.cache_fail_hard:
six.reraise(*sys.exc_info())
return value # As a last resort, return value object (which could be mutated by serialization).
try:
from sqlalchemy.sql.base import Executable
if isinstance(statement, Executable):
statement = str(statement.compile(compile_kwargs={"literal_binds": True}))
except ImportError:
pass
if context is None or context is False:
context = {}
template_context = {}
template_context.update(self._template_context) # default context
template_context.update(context) # context passed in
intersection = set(self._template_context.keys()) & set(context.keys())
if intersection:
logger.warning(
"The following default template context keys have been overridden "
"by the local context: {}."
.format(intersection)
)
# Substitute in any other named statements recursively
while '{{{' in statement or '{{%' in statement:
statement = (
jinja2.Template(
statement,
block_start_string='{{%',
block_end_string='%}}',
variable_start_string='{{{',
variable_end_string='}}}',
comment_start_string='{{#',
comment_end_string='#}}',
'f': 'DOUBLE', # floating-point
'c': 'STRING', # complex floating-point
'O': 'STRING', # object
'S': 'STRING', # (byte-)string
'U': 'STRING', # Unicode
'V': 'STRING' # void
}
# Sanitise column names and map numpy/pandas data-types to hive types.
columns = []
for col, dtype in df.dtypes.iteritems():
col_sanitized = re.sub(r'\W', '', col.lower().replace(' ', '_'))
hive_type = dtype_overrides.get(col) or DTYPE_KIND_HIVE_TYPE.get(dtype.kind)
if hive_type is None:
hive_type = DTYPE_KIND_HIVE_TYPE['O']
logger.warning(
'Unable to determine hive type for dataframe column {col} of pandas dtype {dtype}. '
'Defaulting to hive type {hive_type}. If other column type is desired, '
'please specify via `dtype_overrides`'
.format(**locals())
)
columns.append(
' {column} {type}'.format(column=col_sanitized, type=hive_type)
)
partition_columns = ['{} STRING'.format(col) for col in partition_cols]
tblprops = ["'{key}' = '{value}'".format(key=key, value=value) for key, value in table_props.items()]
tblprops = "TBLPROPERTIES({})".format(",".join(tblprops)) if len(tblprops) > 0 else ""
cmd = Template("""
{% if drop %}
# Save dataframe to file and send it to the remote server if necessary
temp_dir = tempfile.mkdtemp(prefix='omniduct_hiveserver2')
tmp_fname = os.path.join(temp_dir, 'data_{}.csv'.format(time.time()))
logger.info('Saving dataframe to file... {}'.format(tmp_fname))
df.fillna(r'\N').to_csv(tmp_fname, index=False, header=False,
sep=sep, encoding='utf-8')
if self.remote:
logger.info("Uploading data to remote host...")
self.remote.upload(tmp_fname)
# Generate create table statement.
auto_table_props = set(self.default_table_props).difference(table_props)
if len(auto_table_props) > 0:
logger.warning(
"In addition to any specified table properties, this "
"HiveServer2Client has added the following default table "
"properties:\n{default_props}\nTo override them, please "
"specify overrides using: `.push(..., table_props={{...}}).`"
.format(default_props=json.dumps({
prop: value for prop, value in self.default_table_props.items()
if prop in auto_table_props
}, indent=True))
)
tblprops = self.default_table_props.copy()
tblprops.update(table_props or {})
cts = self._create_table_statement_from_df(
df=df,
table=table,
drop=(if_exists == 'replace') and not partition,
return _cache.get(
_key,
namespace=_namespace,
serializer=_serializer
)
except:
logger.warning("Failed to retrieve results from cache. Renewing the cache...")
if config.cache_fail_hard:
six.reraise(*sys.exc_info())
finally:
logger.caveat('Loaded from cache')
# Renewing/creating cache
value = method(self, **kwargs)
if value is None:
logger.warning("Method value returned None. Not saving to cache.")
return
try:
_cache.set(
_key,
value=value,
namespace=_namespace,
serializer=_serializer,
metadata=_metadata
)
# Return from cache every time, just in case serialization operation was
# destructive (e.g. reading from cursors)
return _cache.get(
_key,
namespace=_namespace,
serializer=_serializer