Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
conn.execute("""
INSERT INTO {} VALUES (
'{}', '{}'
)
""".format(mysql_table, *db_record))
from airflow.operators.mysql_to_hive import MySqlToHiveTransfer
import unicodecsv as csv
op = MySqlToHiveTransfer(
task_id='test_m2h',
hive_cli_conn_id='hive_cli_default',
sql="SELECT * FROM {}".format(mysql_table),
hive_table=hive_table,
recreate=True,
delimiter=",",
quoting=csv.QUOTE_NONE,
quotechar='',
escapechar='@',
dag=self.dag)
op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
from airflow.hooks.hive_hooks import HiveServer2Hook
hive_hook = HiveServer2Hook()
result = hive_hook.get_records("SELECT * FROM {}".format(hive_table))
self.assertEqual(result[0], db_record)
finally:
with hook.get_conn() as conn:
conn.execute("DROP TABLE IF EXISTS {}".format(mysql_table))
import six
if sys.version_info[0] == 3:
import csv
else:
import unicodecsv as csv
class CSVLister(ListFormatter):
QUOTE_MODES = {
'all': csv.QUOTE_ALL,
'minimal': csv.QUOTE_MINIMAL,
'nonnumeric': csv.QUOTE_NONNUMERIC,
'none': csv.QUOTE_NONE,
}
def add_argument_group(self, parser):
group = parser.add_argument_group('CSV Formatter')
group.add_argument(
'--quote',
choices=sorted(self.QUOTE_MODES.keys()),
dest='quote_mode',
default='nonnumeric',
help='when to include quotes, defaults to nonnumeric',
)
def emit_list(self, column_names, data, stdout, parsed_args):
writer_kwargs = dict(
quoting=self.QUOTE_MODES[parsed_args.quote_mode],
lineterminator=os.linesep,