Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
from dataclasses import dataclass, field
from typing import Optional
drop_lock: Lock = dbt.flags.MP_CONTEXT.Lock()
IAMDuration = NewType('IAMDuration', int)
class IAMDurationEncoder(FieldEncoder):
@property
def json_schema(self):
return {'type': 'integer', 'minimum': 0, 'maximum': 65535}
JsonSchemaMixin.register_field_encoders({IAMDuration: IAMDurationEncoder()})
class RedshiftConnectionMethod(StrEnum):
DATABASE = 'database'
IAM = 'iam'
@dataclass
class RedshiftCredentials(PostgresCredentials):
method: RedshiftConnectionMethod = RedshiftConnectionMethod.DATABASE
password: Optional[str] = None
cluster_id: Optional[str] = field(
default=None,
metadata={'description': 'If using IAM auth, the name of the cluster'},
)
iam_duration_seconds: int = 900
raise ValidationError('Got invalid NoValue: {}'.format(value))
return NoValue()
@property
def json_schema(self):
return {
'type': 'object',
'properties': {
'novalue': {
'enum': ['novalue'],
}
}
}
JsonSchemaMixin.register_field_encoders({
Port: PortEncoder(),
timedelta: TimeDeltaFieldEncoder(),
Real: RealEncoder(),
NoValue: NoValueEncoder(),
})