Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def get_webdav_url():
if config.get('ssl', 'privatekey'):
protocol = 'https'
else:
protocol = 'http'
hostname = (config.get('webdav', 'hostname')
or unicode(socket.getfqdn(), 'utf8'))
hostname = '.'.join(encodings.idna.ToASCII(part) for part in
hostname.split('.'))
return urlparse.urlunsplit((protocol, hostname,
urllib.quote(
Transaction().cursor.database_name.encode('utf-8') + '/'),
None, None))
def list(self, hostname=None):
res = []
listdir = [':memory:']
try:
listdir += os.listdir(config.get('database', 'path'))
except OSError:
pass
for db_file in listdir:
if db_file.endswith('.sqlite') or db_file == ':memory:':
if db_file == ':memory:':
db_name = ':memory:'
else:
db_name = db_file[:-7]
try:
database = Database(db_name).connect()
except Exception:
logger.debug(
'Test failed for "%s"', db_name, exc_info=True)
continue
if database.test(hostname=hostname):
res.append(db_name)
def setmany(self, data, prefix=''):
return [self.set(d, prefix) for d in data]
def _filename(self, id, prefix):
path = os.path.normpath(config.get('database', 'path'))
filename = os.path.join(path, prefix, id[0:2], id[2:4], id)
filename = os.path.normpath(filename)
if not filename.startswith(path):
raise ValueError('Bad prefix')
return filename
def _id(self, data):
return hashlib.md5(data).hexdigest()
if config.get('database', 'class'):
FileStore = resolve(config.get('database', 'class')) # noqa: F811
filestore = FileStore()
from trytond.transaction import Transaction
from .resource import ResourceMixin, resource_copy
__all__ = ['AttachmentCopyMixin']
def firstline(description):
try:
return next((x for x in description.splitlines() if x.strip()))
except StopIteration:
return ''
if config.getboolean('attachment', 'filestore', default=True):
file_id = 'file_id'
store_prefix = config.get('attachment', 'store_prefix', default=None)
else:
file_id = None
store_prefix = None
class Attachment(ResourceMixin, ModelSQL, ModelView):
"Attachment"
__name__ = 'ir.attachment'
name = fields.Char('Name', required=True)
type = fields.Selection([
('data', 'Data'),
('link', 'Link'),
], 'Type', required=True)
description = fields.Text('Description')
summary = fields.Function(fields.Char('Summary'), 'on_change_with_summary')
link = fields.Char('Link', states={
def _connection_params(cls, name):
uri = parse_uri(config.get('database', 'uri'))
params = {
'dbname': name,
}
if uri.username:
params['user'] = uri.username
if uri.password:
params['password'] = urllib.parse.unquote_plus(uri.password)
if uri.hostname:
params['host'] = uri.hostname
if uri.port:
params['port'] = uri.port
return params
except ImportError:
from http import client as HTTPStatus
from werkzeug.exceptions import abort
from werkzeug.utils import redirect
from werkzeug.wrappers import Response
from trytond.i18n import gettext
from trytond.config import config
from trytond.wsgi import app
from trytond.protocols.jsonrpc import JSONDecoder
from trytond.protocols.wrappers import with_pool, with_transaction
from trytond.tools import slugify
from trytond.transaction import Transaction
SOURCE = config.get(
'html', 'src', default='https://cloud.tinymce.com/stable/tinymce.min.js')
def get_token(record):
return str((record.write_date or record.create_date).timestamp())
def get_config(names, section='html', default=None):
names = names[:]
while names:
value = config.get(section, '-'.join(names))
if value is not None:
return value
names = names[:-1]
return default
def get_relation_fields(cls):
if not config.get('dict', cls.__name__, default=True):
return {}
fields = cls._relation_fields_cache.get(cls.__name__)
if fields is not None:
return fields
keys = cls.get_keys(cls.search([]))
fields = {k['name']: k for k in keys}
cls._relation_fields_cache.set(cls.__name__, fields)
return fields
def get_connection(self, autocommit=False, readonly=False):
conv = MySQLdb.converters.conversions.copy()
conv[float] = lambda value, _: repr(value)
conv[MySQLdb.constants.FIELD_TYPE.TIME] = MySQLdb.times.Time_or_None
args = {
'db': self.name,
'sql_mode': 'traditional,postgresql',
'use_unicode': True,
'charset': 'utf8',
'conv': conv,
}
uri = parse_uri(config.get('database', 'uri'))
if uri.hostname:
args['host'] = uri.hostname
if uri.port:
args['port'] = uri.port
if uri.username:
args['user'] = uri.username
if uri.password:
args['passwd'] = urllib.unquote_plus(uri.password)
conn = MySQLdb.connect(**args)
cursor = conn.cursor()
cursor.execute('SET time_zone = "+00:00"')
return conn
from trytond import backend
from trytond.wsgi import app
from trytond.transaction import Transaction
from trytond.protocols.jsonrpc import JSONEncoder, JSONDecoder
from trytond.config import config
from trytond.tools import resolve
logger = logging.getLogger(__name__)
_db_timeout = config.getint('database', 'timeout')
_cache_timeout = config.getint('bus', 'cache_timeout')
_select_timeout = config.getint('bus', 'select_timeout')
_long_polling_timeout = config.getint('bus', 'long_polling_timeout')
_allow_subscribe = config.getboolean('bus', 'allow_subscribe')
_url_host = config.get('bus', 'url_host')
_web_cache_timeout = config.getint('web', 'cache_timeout')
class _MessageQueue:
Message = collections.namedtuple('Message', 'channel content timestamp')
def __init__(self, timeout):
super().__init__()
self._lock = threading.Lock()
self._timeout = timeout
self._messages = []
def append(self, channel, element):
self._messages.append(
self.Message(channel, element, time.time()))