Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
if os.path.exists(self.history_file):
readline.read_history_file(self.history_file)
self.prompt_text = u.STY_CURSOR + ' > ' + u.STY_USER
try:
while True:
if self.CHANNEL_IN == 'letschat':
self.user_input = self.lc.poll_LC()
elif self.CHANNEL_IN == 'email':
self.email_item = poll_email()
if self.email_item is None:
self.user_input = ''
else:
self.user_input = self.email_item['user_input']
else: # screen
self.user_input = raw_input(self.prompt_text)
print(style.RESET, end="")
# Only want these enabled when running locally (ie screen)
if self.CHANNEL_IN == 'screen':
if self.user_input.lower() == 'q':
print(
'\t\t' + u.STY_RESP + ' Okay. Goodbye! ' + u.STY_USER +
'\n')
self.before_quit()
elif self.user_input.lower() == 't':
self.tag_last()
continue
elif self.user_input.lower() == 'v':
self.verbose = not self.verbose
self.print_settings('Verbose responses: ' + str(self.verbose))
continue
elif self.user_input.lower() == 's':
if os.path.exists(HISTORY_FILENAME):
readline.read_history_file(HISTORY_FILENAME)
prompt_text = STY_CURSOR + ' > ' + STY_USER
try:
while True:
if CHANNEL_IN == 'online':
user_input = poll_LC()
elif CHANNEL_IN == 'email':
email_item = poll_email()
if email_item is None:
user_input = ''
else:
user_input = email_item['user_input']
else: # screen
user_input = raw_input(prompt_text)
print(style.RESET, end="")
if user_input.lower() == 'q':
print(
'\t\t' + STY_RESP + ' Okay. Goodbye! ' + STY_USER +
'\n')
before_quit()
elif user_input.lower() == 't':
tag_last()
elif user_input.lower() == 'v':
verbose = not verbose
print_settings('Verbose responses: ' + str(verbose))
elif user_input.lower() == 's':
show_parse = not show_parse
print_settings(
'Show_parse: ' + {True: 'on', False: 'off'}[show_parse])
elif user_input.lower() == 'd':
ch.setLevel(logging.DEBUG)
STY_CURSOR = fore.LIGHT_GOLDENROD_2B + back.BLACK + style.BOLD
STY_RESP = fore.WHITE + back.MEDIUM_VIOLET_RED + style.BOLD
# STY_RESP = fore.WHITE + back.GREY_11 + style.BOLD #+ style.NORMAL
STY_EMAIL = fore.WHITE + back.GREY_11 + style.BOLD
STY_RESP_SPECIAL = fore.WHITE + back.LIGHT_GOLDENROD_2B + style.BOLD
STY_HIGHLIGHT = fore.DEEP_PINK_4C + back.BLACK
STY_DRAW = fore.WHITE + back.BLACK + style.BOLD
logger = logging.getLogger(BOTNAME)
logger.setLevel(logging.DEBUG)
ch = logging.StreamHandler()
ch.setLevel(logging.WARN)
# ch.setLevel(logging.DEBUG)
formatter = logging.Formatter(
STY_DESC_DEBUG + '%(asctime)s - %(name)s - %(levelname)8s - %(message)s' +
style.RESET, datefmt='%Y-%b-%d %H:%M:%S')
ch.setFormatter(formatter)
logger.addHandler(ch)
numwords = {}
def handle_ctrl_c(signal, frame):
# TODO: test the behaviour on Windows
# close down anything that should be closed before exiting
before_quit()
sys.exit(130) # 130 is standard exit code for C
def nthwords2int(nthword):
"""Takes an "nth-word" (eg 3rd, 21st, 28th) strips off the ordinal ending
and returns the pure number."""
def print_warning(*objs):
print(fore.YELLOW + 'WARNING: ', end='', file=sys.stderr)
print_err(*objs)
print(style.RESET, end='', file=sys.stderr)
if self.verbose:
if self.progress_total_items and self.is_tty:
p_text = ''
p_length = self.LEN(p_text)
end_point = self.LEN(c) - self.LEN(eta_c)
break_point = round(end_point * self.progress_current_item / self.progress_total_items)
# see a full list of color codes: https://gitlab.com/dslackw/colored
if p_length >= break_point:
sys.stderr.write(back.CYAN + fore.BLACK + c[:break_point] + \
back.GREY_30 + fore.WHITE + c[break_point:end_point] + \
back.CYAN + fore.CYAN + c[end_point] + \
back.GREY_50 + fore.LIGHT_CYAN + c[end_point:] + \
style.RESET)
else:
sys.stderr.write(back.CYAN + fore.BLACK + c[:break_point - p_length] + \
back.SALMON_1 + fore.BLACK + p_text + \
back.GREY_30 + fore.WHITE + c[break_point:end_point] + \
back.GREY_50 + fore.LIGHT_CYAN + c[end_point:] + \
style.RESET)
sys.stderr.flush()
else:
sys.stderr.write(back.CYAN + fore.BLACK + c + style.RESET)
sys.stderr.flush()
off = args.off
microservice_name = args.microservice_name
volume = args.volume
if off:
path = get_armada_develop_env_file_path()
if os.path.exists(path):
os.unlink(path)
return
current_dir_name = os.path.basename(os.getcwd())
if current_dir_name != microservice_name:
print(style.BOLD + fore.YELLOW
+ 'WARNING: Current working directory name "{}" does not match '
'microservice name "{}".'.format(current_dir_name, microservice_name)
+ style.RESET)
save_dev_env_vars(microservice_name, volume)
def __init__(self, parent):
super(wx.richtext.RichTextCtrl, self).__init__(parent, -1, "", style=wx.richtext.RE_MULTILINE | wx.richtext.RE_READONLY)
self.regex_urls=re.compile(r'\b((?:file://|https?://|mailto:)[^][\s<>|]*)')
self.url_colour = wx.Colour(0,0,255)
self.esc = colored.style.ESC
self.end = colored.style.END
self.noop = lambda *args, **kwargs: None
self.actionsMap = {
colored.style.BOLD: self.BeginBold,
colored.style.RES_BOLD: self.EndBold,
colored.style.UNDERLINED: self.BeginUnderline,
colored.style.RES_UNDERLINED: self.EndUnderline,
colored.style.RESET: self.EndAllStyles,
}
# Actions for coloring text
for index, hex in enumerate(kColorList):
escSeq = '{}{}{}'.format(colored.fore.ESC, index, colored.fore.END)
wxcolor = wx.Colour(int(hex[1:3],16), int(hex[3:5],16), int(hex[5:],16), alpha=wx.ALPHA_OPAQUE)
# NB : we use a default parameter to force the evaluation of the binding
self.actionsMap[escSeq] = lambda bindedColor=wxcolor: self.BeginTextColour(bindedColor)
def notify_about_detected_dev_environment(image_name):
if is_armada_develop_on() and os.environ.get('MICROSERVICE_NAME') == image_name:
print(style.BOLD + fore.GREEN
+ 'INFO: Detected development environment for microservice "{}".'.format(image_name)
+ style.RESET)