Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
return result
def _construct_template(self):
self.template = u'{0} (comma-separated list)'
if 'default' in self.spec:
self.template += u' [{default}]: '.format(default=','.join(self.spec.get('default')))
else:
self.template += u': '
def _transform_response(self, response):
return [item.strip() for item in response.split(',')]
class ArrayObjectReader(StringReader):
@staticmethod
def condition(spec):
return spec.get('type', None) == 'array' and spec.get('items', {}).get('type') == 'object'
def read(self):
results = []
properties = self.spec.get('items', {}).get('properties', {})
message = '~~~ Would you like to add another item to "%s" array / list?' % self.name
is_continue = True
index = 0
while is_continue:
prefix = u'{name}[{index}].'.format(name=self.name, index=index)
results.append(
InteractiveForm(properties, prefix=prefix, reraise=True).initiate_dialog()
)
super(SecretStringReader, self).__init__(*args, secret=True, **kwargs)
@staticmethod
def condition(spec):
return spec.get('secret', None)
def _construct_template(self):
self.template = u'{0} (secret)'
if 'default' in self.spec:
self.template += u' [{default}]: '.format(default=self.spec.get('default'))
else:
self.template += u': '
class EnumReader(StringReader):
@staticmethod
def condition(spec):
return spec.get('enum', None)
@staticmethod
def validate(input, spec):
if not input and (not spec.get('required', None) or spec.get('default', None)):
return
if not input.isdigit():
raise validation.ValidationError(len(input), 'Not a number')
enum = spec.get('enum')
try:
enum[int(input)]
except IndexError:
super(NumberReader, NumberReader).validate(input, spec)
def _construct_template(self):
self.template = u'{0} (float)'
if 'default' in self.spec:
self.template += u' [{default}]: '.format(default=self.spec.get('default'))
else:
self.template += u': '
def _transform_response(self, response):
return float(response)
class IntegerReader(StringReader):
@staticmethod
def condition(spec):
return spec.get('type', None) == 'integer'
@staticmethod
def validate(input, spec):
if input:
try:
input = int(input)
except ValueError as e:
raise validation.ValidationError(len(input), six.text_type(e))
super(IntegerReader, IntegerReader).validate(input, spec)
def _construct_template(self):
self.template = u'{0} (integer)'
else:
self.template += u': '
def _transform_response(self, response):
if response.lower() in POSITIVE_BOOLEAN:
return True
if response.lower() in NEGATIVE_BOOLEAN:
return False
# Hopefully, it will never happen
raise OperationFailureException(
'Response neither positive no negative. ' 'Value have not been properly validated.'
)
class NumberReader(StringReader):
@staticmethod
def condition(spec):
return spec.get('type', None) == 'number'
@staticmethod
def validate(input, spec):
if input:
try:
input = float(input)
except ValueError as e:
raise validation.ValidationError(len(input), six.text_type(e))
super(NumberReader, NumberReader).validate(input, spec)
def _construct_template(self):
self.template = u'{0} (float)'
class ObjectReader(StringReader):
@staticmethod
def condition(spec):
return spec.get('type', None) == 'object'
def read(self):
prefix = u'{}.'.format(self.name)
result = InteractiveForm(
self.spec.get('properties', {}), prefix=prefix, reraise=True
).initiate_dialog()
return result
class ArrayReader(StringReader):
@staticmethod
def condition(spec):
return spec.get('type', None) == 'array'
@staticmethod
def validate(input, spec):
if not input and (not spec.get('required', None) or spec.get('default', None)):
return
for m in re.finditer(r'[^, ]+', input):
index, item = m.start(), m.group()
try:
StringReader.validate(item, spec.get('items', {}))
except validation.ValidationError as e:
raise validation.ValidationError(index, six.text_type(e))
super(IntegerReader, IntegerReader).validate(input, spec)
def _construct_template(self):
self.template = u'{0} (integer)'
if 'default' in self.spec:
self.template += u' [{default}]: '.format(default=self.spec.get('default'))
else:
self.template += u': '
def _transform_response(self, response):
return int(response)
class SecretStringReader(StringReader):
def __init__(self, *args, **kwargs):
super(SecretStringReader, self).__init__(*args, secret=True, **kwargs)
@staticmethod
def condition(spec):
return spec.get('secret', None)
def _construct_template(self):
self.template = u'{0} (secret)'
if 'default' in self.spec:
self.template += u' [{default}]: '.format(default=self.spec.get('default'))
else:
self.template += u': '
if num_options > 3:
num_options = 3
more = '...'
options = [str(i) for i in range(0, num_options)]
self.template += u'\nChoose from {}{}'.format(', '.join(options), more)
if 'default' in self.spec:
self.template += u' [{}]: '.format(enum.index(self.spec.get('default')))
else:
self.template += u': '
def _transform_response(self, response):
return self.spec.get('enum')[int(response)]
class ObjectReader(StringReader):
@staticmethod
def condition(spec):
return spec.get('type', None) == 'object'
def read(self):
prefix = u'{}.'.format(self.name)
result = InteractiveForm(
self.spec.get('properties', {}), prefix=prefix, reraise=True
).initiate_dialog()
return result
class ArrayReader(StringReader):
@staticmethod
for Reader in self.readers:
if Reader.condition(spec):
reader = Reader(field, spec, prefix=self.prefix)
break
if not reader:
raise ReaderNotImplemented('No reader for the field spec')
try:
return reader.read()
except KeyboardInterrupt:
raise DialogInterrupted()
class Question(StringReader):
def __init__(self, message, spec=None):
if not spec:
spec = {}
super(Question, self).__init__(message, spec)
return result
class InteractiveForm(object):
readers = [
EnumReader,
BooleanReader,
NumberReader,
IntegerReader,
ObjectReader,
ArrayEnumReader,
ArrayObjectReader,
ArrayReader,
SecretStringReader,
StringReader,
]
def __init__(self, schema, prefix=None, reraise=False):
self.schema = schema
self.prefix = prefix
self.reraise = reraise
def initiate_dialog(self):
result = {}
try:
for field in self.schema:
try:
result[field] = self._read_field(field)
except ReaderNotImplemented as e:
print('%s. Skipping...' % six.text_type(e))
def validate(input, spec):
if not input and (not spec.get('required', None) or spec.get('default', None)):
return
for m in re.finditer(r'[^, ]+', input):
index, item = m.start(), m.group()
try:
StringReader.validate(item, spec.get('items', {}))
except validation.ValidationError as e:
raise validation.ValidationError(index, six.text_type(e))