Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
if not hasattr(task, 'post'):
raise fe.Invalid('"%s" is not a task.' % value, value, state)
return task
class UserValidator(fev.FancyValidator):
def _to_python(self, value, state):
from allura import model as M
user = M.User.by_username(value)
if not user:
raise fe.Invalid('Invalid username', value, state)
return user
class AnonymousValidator(fev.FancyValidator):
def _to_python(self, value, state):
from allura.model import User
if value:
if c.user == User.anonymous():
raise fe.Invalid('Log in to Mark as Private', value, state)
else:
return value
class PathValidator(fev.FancyValidator):
def _to_python(self, value, state):
from allura import model as M
parts = value.strip('/').split('/')
from __future__ import unicode_literals, print_function
import re
from decimal import Decimal
import formencode
from google.appengine.ext import ndb
from google.appengine.ext.db import BadValueError
__all__ = ['DecimalString', 'slug_validator', 'email_validator',
'SlugProperty', 'EmailProperty', 'DecimalProperty']
class DecimalString(formencode.validators.FancyValidator):
""" Custom validator for handling decimal values """
min = 0
max = 999999999
precision = 12
numeric_re = re.compile(r'^-?\d+(\.\d+)?$')
messages = {
'too_small': 'Number cannot be smaller than %s' % min,
'too_large': 'Number cannot be larger than %s' % max,
'not_a_number': '%(nan)s is not a numeric value',
}
def __init__(self, min=None, max=None, precision=None, *args, **kwargs):
if min is not None:
self.min = min
if max is not None:
var lastInput = addButton.getPrevious();
var fullname = lastInput.get('name');
var sepindex = fullname.indexOf('-') + 1;
var name = fullname.substr(0, sepindex);
var nextNum = fullname.substr(sepindex).toInt() + 1;
var el = new Element('input', {
'type': 'text',
'name': name + nextNum,
'class': 'textfield repeatedtextfield rtmp-server-uri'
});
el.inject(lastInput, 'after').focus();
});
});
""", location='headbottom')
class RTMPURLValidator(FancyValidator):
def _to_python(self, value, state=None):
if value.startswith('rtmp://'):
return value.rstrip('/')
raise Invalid(_('RTMP server URLs must begin with rtmp://'),
value, state)
class RemoteURLStorageForm(StorageForm):
event = events.Admin.Storage.RemoteURLStorageForm
fields = StorageForm.fields + [
ListFieldSet('rtmp',
legend=N_('RTMP Servers:'),
suppress_label=True,
children=[
# FIXME: Display errors from the RTMPURLValidator
FormFieldRepeater('known_servers',
if not person:
return dict(isOk=0)
# Reset account
c.password = store.makeRandomAlphaNumericString(parameter.PASSWORD_LENGTH_AVERAGE)
return changeAccount(dict(
username=person.username,
password=c.password,
nickname=person.nickname,
email=person.email,
email_sms=person.email_sms,
), 'reset', '/people/confirm.mako', person)
# Validators
class Unique(formencode.validators.FancyValidator):
'Validator to ensure unique values in a field'
def __init__(self, fieldName, errorMessage):
'Store fieldName and errorMessage'
super(Unique, self).__init__()
self.fieldName = fieldName
self.errorMessage = errorMessage
def _to_python(self, value, person):
'Check whether the value is unique'
# If the person is new or the value changed,
if not person or getattr(person, self.fieldName) != value:
# Make sure the value is unique
if Session.query(model.Person).filter(getattr(model.Person, self.fieldName)==value).first():
# Raise
raise formencode.Invalid(self.errorMessage, value, person)
from webob import exc
from pymongo.errors import DuplicateKeyError
from paste.deploy.converters import asint, aslist
from allura.app import AdminControllerMixin
from allura.controllers import BaseController
from allura.lib import helpers as h
from allura.lib.decorators import require_post, task
from allura.lib.utils import DateJSONEncoder
from allura import model as M
log = logging.getLogger(__name__)
class WebhookValidator(fev.FancyValidator):
def __init__(self, sender, app, **kw):
self.app = app
self.sender = sender
super(WebhookValidator, self).__init__(**kw)
def _to_python(self, value, state):
wh = None
if isinstance(value, M.Webhook):
wh = value
elif isinstance(value, ObjectId):
wh = M.Webhook.query.get(_id=value)
else:
try:
wh = M.Webhook.query.get(_id=ObjectId(value))
except:
pass
break
return ago + ' ago'
def ago_ts(timestamp):
return ago(datetime.utcfromtimestamp(timestamp))
def ago_string(s):
try:
return ago(parse(s, ignoretz=True))
except (ValueError, AttributeError, TypeError):
return 'unknown'
class DateTimeConverter(FancyValidator):
def _to_python(self, value, state):
try:
return parse(value)
except (ValueError, TypeError):
if self.if_invalid != formencode.api.NoDefault:
return self.if_invalid
else:
raise
def _from_python(self, value, state):
return value.isoformat()
def absurl(url):
"""
mod = __import__(mod, fromlist=[str(func)])
except ImportError:
raise fe.Invalid('Could not import "%s"' % value, value, state)
try:
task = getattr(mod, func)
except AttributeError:
raise fe.Invalid('Module has no attribute "%s"' %
func, value, state)
if not hasattr(task, 'post'):
raise fe.Invalid('"%s" is not a task.' % value, value, state)
return task
class UserValidator(fev.FancyValidator):
def _to_python(self, value, state):
from allura import model as M
user = M.User.by_username(value)
if not user:
raise fe.Invalid('Invalid username', value, state)
return user
class AnonymousValidator(fev.FancyValidator):
def _to_python(self, value, state):
from allura.model import User
if value:
if c.user == User.anonymous():
raise fe.Invalid('Log in to Mark as Private', value, state)
def csv_parser(page):
lines = page.readlines()
if not lines:
return []
# skip CSV header
lines = lines[1:]
# skip "next page here" info footer
if not lines[-1].startswith('"'):
lines.pop()
# remove CSV wrapping (quotes, commas, newlines)
return [line.strip('",\n') for line in lines]
class GoogleCodeProjectNameValidator(fev.FancyValidator):
not_empty = True
messages = {
'invalid': 'Please enter a project URL, or a project name containing '
'only letters, numbers, and dashes.',
'unavailable': 'This project is unavailable for import',
}
def _to_python(self, value, state=None):
project_name_re = re.compile(r'^[a-z0-9][a-z0-9-]{,61}$')
if project_name_re.match(value):
# just a name
project_name = value
else:
# try as a URL
project_name = None
project_name_simple = None
value, state)
class CreateTaskSchema(fe.Schema):
task = TaskValidator(not_empty=True, strip=True)
task_args = JsonConverter(if_missing=dict(args=[], kwargs={}))
user = UserValidator(strip=True, if_missing=None)
path = PathValidator(strip=True, if_missing={}, if_empty={})
class CreateSiteNotificationSchema(fe.Schema):
active = fev.StringBool(if_missing=False)
impressions = fev.Int(not_empty=True)
content = fev.UnicodeString(not_empty=True)
user_role = fev.FancyValidator(not_empty=False, if_empty=None)
page_regex = fev.FancyValidator(not_empty=False, if_empty=None)
page_tool_type = fev.FancyValidator(not_empty=False, if_empty=None)
class DateValidator(fev.FancyValidator):
def _to_python(self, value, state):
value = convertDate(value)
if not value:
raise fe.Invalid(
"Please enter a valid date in the format DD/MM/YYYY.",
value, state)
return value
class TimeValidator(fev.FancyValidator):
from decimal import Decimal, DecimalException
import sys
from dateutil.parser import parse
import formencode
import formencode.validators as fev
import formencode.national
import sqlalchemy as sa
from savalidation._internal import is_iterable
import six
_ELV = '_sav_entity_linkers'
class BaseValidator(fev.FancyValidator):
def __classinit__(cls, new_attrs):
depricated_methods = getattr(cls, '_deprecated_methods', None) or \
new_attrs.get('_deprecated_methods')
if depricated_methods is not None:
for old, new in depricated_methods:
if old in new_attrs:
method = new_attrs.pop(old)
setattr(cls, new, method)
new_attrs[new] = method
return fev.FancyValidator.__classinit__(cls, new_attrs)
class NumericValidator(BaseValidator):
def __init__(self, places, prec):
self.places = places
self.prec = prec