Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_adapt_most_specific(self):
class A(object):
pass
class B(A):
pass
class C(B):
pass
register_adapter(A, lambda a: AsIs("a"))
register_adapter(B, lambda b: AsIs("b"))
self.assertEqual(b'b', adapt(C()).getquoted())
#!/usr/local/virtualenv/huesound/bin/python
#!/usr/bin/env python
import sys
sys.path.append("../huesound")
import urllib2
import psycopg2;
import subprocess;
from psycopg2.extensions import register_adapter
from huesound import cube
register_adapter(cube.Cube, cube.adapt_cube)
try:
conn = psycopg2.connect("dbname=huesound user=huesound host=localhost port=5432")
cur = conn.cursor()
conn2 = psycopg2.connect("dbname=huesound user=huesound host=localhost port=5432")
out_cur = conn2.cursor()
except psycopg2.OperationalError as err:
print "Cannot connect to database: %s" % err
exit()
cur.execute('SELECT id, icon_url FROM color_rdio_cube WHERE red < 0');
for row in cur:
try:
f = urllib2.urlopen(row[1])
except urllib2.URLError:
import uuid
import psycopg2
import psycopg2.extras
import rapidjson as json
psycopg2.extensions.register_adapter(dict, psycopg2.extras.Json)
psycopg2.extensions.register_adapter(uuid.UUID, psycopg2.extras.UUID_adapter)
def init(env, password=None, reset=False):
if reset:
with env.db_connect(dbname='postgres') as conn:
conn.set_isolation_level(0)
with conn.cursor() as cur:
cur.execute('DROP DATABASE IF EXISTS %s' % env.db_name)
cur.execute('CREATE DATABASE %s' % env.db_name)
sql = '''
CREATE EXTENSION IF NOT EXISTS "pgcrypto";
CREATE OR REPLACE FUNCTION fill_updated()
RETURNS TRIGGER AS $$
BEGIN
IF row(NEW.*) IS DISTINCT FROM row(OLD.*) THEN
if not oids:
oid1 = 2950
oid2 = 2951
elif isinstance(oids, (list, tuple)):
oid1, oid2 = oids
else:
oid1 = oids
oid2 = 2951
_ext.UUID = _ext.new_type((oid1, ), "UUID",
lambda data, cursor: data and uuid.UUID(data) or None)
_ext.UUIDARRAY = _ext.new_array_type((oid2,), "UUID[]", _ext.UUID)
_ext.register_type(_ext.UUID, conn_or_curs)
_ext.register_type(_ext.UUIDARRAY, conn_or_curs)
_ext.register_adapter(uuid.UUID, UUID_adapter)
return _ext.UUID
from psycopg2 import tz # noqa
# Register default adapters.
from psycopg2 import extensions as _ext
_ext.register_adapter(tuple, _ext.SQL_IN)
_ext.register_adapter(type(None), _ext.NoneAdapter)
# Register the Decimal adapter here instead of in the C layer.
# This way a new class is registered for each sub-interpreter.
# See ticket #52
from decimal import Decimal # noqa
from psycopg2._psycopg import Decimal as Adapter # noqa
_ext.register_adapter(Decimal, Adapter)
del Decimal, Adapter
def connect(dsn=None, connection_factory=None, cursor_factory=None, **kwargs):
"""
Create a new database connection.
The connection parameters can be specified as a string:
conn = psycopg2.connect("dbname=test user=postgres password=secret")
or using a set of keyword arguments:
conn = psycopg2.connect(database="test", user="postgres", password="secret")
Or as a mix of both. The basic connection parameters are:
from django.utils.timezone import utc
from psycopg2_pool import PostgresConnectionPool
try:
import psycopg2 as Database
import psycopg2.extensions
except ImportError, e:
from django.core.exceptions import ImproperlyConfigured
raise ImproperlyConfigured("Error loading psycopg2 module: %s" % e)
DatabaseError = Database.DatabaseError
IntegrityError = Database.IntegrityError
psycopg2.extensions.register_type(psycopg2.extensions.UNICODE)
psycopg2.extensions.register_adapter(SafeString, psycopg2.extensions.QuotedString)
psycopg2.extensions.register_adapter(SafeUnicode, psycopg2.extensions.QuotedString)
logger = getLogger('django.db.backends')
POOLS = {}
def utc_tzinfo_factory(offset):
if offset != 0:
raise AssertionError("database connection isn't set to UTC")
return utc
class CursorWrapper(object):
"""
A thin wrapper around psycopg2's normal cursor class so that we can catch
particular exception instances and reraise them with the right types.
if array_oid is not None:
if isinstance(array_oid, int):
array_oid = (array_oid,)
else:
array_oid = tuple([x for x in array_oid if x])
# create and register the typecaster
if _sys.version_info[0] < 3 and unicode:
cast = HstoreAdapter.parse_unicode
else:
cast = HstoreAdapter.parse
HSTORE = _ext.new_type(oid, "HSTORE", cast)
_ext.register_type(HSTORE, not globally and conn_or_curs or None)
_ext.register_adapter(dict, HstoreAdapter)
if array_oid:
HSTOREARRAY = _ext.new_array_type(array_oid, "HSTOREARRAY", HSTORE)
_ext.register_type(HSTOREARRAY, not globally and conn_or_curs or None)
if array_oid is not None:
if isinstance(array_oid, int):
array_oid = (array_oid,)
else:
array_oid = tuple([x for x in array_oid if x])
# create and register the typecaster
if PY2 and unicode:
cast = HstoreAdapter.parse_unicode
else:
cast = HstoreAdapter.parse
HSTORE = _ext.new_type(oid, "HSTORE", cast)
_ext.register_type(HSTORE, not globally and conn_or_curs or None)
_ext.register_adapter(dict, HstoreAdapter)
if array_oid:
HSTOREARRAY = _ext.new_array_type(array_oid, "HSTOREARRAY", HSTORE)
_ext.register_type(HSTOREARRAY, not globally and conn_or_curs or None)
DeclarativeBase.__str__ = __str__
# IP types handling
def adapt_ipv4network(ipnet):
val = adapt(str(ipnet)).getquoted()
return AsIs(val + "::cidr")
def adapt_ipv4address(ipnet):
val = adapt(str(ipnet)).getquoted()
return AsIs(val + "::inet")
psycopg2.extensions.register_adapter(IPv4Network, adapt_ipv4network)
psycopg2.extensions.register_adapter(IPv4Address, adapt_ipv4address)
psycopg2.extensions.register_type(psycopg2.extensions.new_array_type((651,), "CIDR[]", psycopg2.STRING))
# Date types handling
class InfDateAdapter(object):
"""Map datetime.date.min/max values to infinity in Postgres
Taken from: http://initd.org/psycopg/docs/usage.html"""
def __init__(self, wrapped):
self.wrapped = wrapped
def getquoted(self):
if self.wrapped == datetime.date.max:
return b"'infinity'::date"