Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def prepare_postgres(self):
dirname = "postgres"
self.client.volumes.create(constants.POSTGRES_DB_VOLUME, labels={"carrier": "postgres"})
self.volumes_piece += f"\n {constants.POSTGRES_DB_VOLUME}:\n external: true"
makedirs(path.join(constants.WORKDIR, dirname), exist_ok=True)
with open(path.join(constants.WORKDIR, dirname, "Dockerfile"), "w") as f:
f.write(constants.POSTGRESFILE)
with open(path.join(constants.WORKDIR, dirname, constants.POSTGRES_ENTRYPOINT_FILENAME), "w") as f:
f.write(constants.POSTGRES_ENTRYPOINT)
self.postgres_piece = constants.POSTGRES_COMPOSE.format(
path=path.join(constants.WORKDIR, dirname),
volume=constants.POSTGRES_DB_VOLUME
)
def prepare_influx(self):
self.client.volumes.create(constants.INFLUX_VOLUME_NAME, labels={"carrier": "influx"})
self.volumes_piece += f'\n {constants.INFLUX_VOLUME_NAME}:\n external: true'
makedirs(path.join(constants.WORKDIR, 'influx'), exist_ok=True)
with open(path.join(constants.WORKDIR, 'influx', "Dockerfile"), "w") as f:
f.write(constants.INFLUXFILE)
with open(path.join(constants.WORKDIR, 'influx', "influxdb.conf"), "w") as f:
f.write(constants.INFLUX_CONF)
self.influx_piece = constants.INFLUX_COMPOSE.format(path=path.join(constants.WORKDIR, 'influx'),
volume=constants.INFLUX_VOLUME_NAME)
def seed_grafana_dashboards(self):
self._seed_data_to_influx(constants.GRAFANA_URL.format(host=self.data['dns']), constants.GRAFANA_DASHBOARDS)
def prepare_postgres(self):
dirname = "postgres"
self.client.volumes.create(constants.POSTGRES_DB_VOLUME, labels={"carrier": "postgres"})
self.volumes_piece += f"\n {constants.POSTGRES_DB_VOLUME}:\n external: true"
makedirs(path.join(constants.WORKDIR, dirname), exist_ok=True)
with open(path.join(constants.WORKDIR, dirname, "Dockerfile"), "w") as f:
f.write(constants.POSTGRESFILE)
with open(path.join(constants.WORKDIR, dirname, constants.POSTGRES_ENTRYPOINT_FILENAME), "w") as f:
f.write(constants.POSTGRES_ENTRYPOINT)
self.postgres_piece = constants.POSTGRES_COMPOSE.format(
path=path.join(constants.WORKDIR, dirname),
volume=constants.POSTGRES_DB_VOLUME
)
def prepare_influx(self):
self.client.volumes.create(constants.INFLUX_VOLUME_NAME, labels={"carrier": "influx"})
self.volumes_piece += f'\n {constants.INFLUX_VOLUME_NAME}:\n external: true'
makedirs(path.join(constants.WORKDIR, 'influx'), exist_ok=True)
with open(path.join(constants.WORKDIR, 'influx', "Dockerfile"), "w") as f:
f.write(constants.INFLUXFILE)
with open(path.join(constants.WORKDIR, 'influx', "influxdb.conf"), "w") as f:
f.write(constants.INFLUX_CONF)
self.influx_piece = constants.INFLUX_COMPOSE.format(path=path.join(constants.WORKDIR, 'influx'),
volume=constants.INFLUX_VOLUME_NAME)
def prepare_grafana(self):
self.client.volumes.create(constants.GRAFANA_VOLUME_NAME, labels={"carrier": "grafana"})
self.volumes_piece += f'\n {constants.GRAFANA_VOLUME_NAME}:\n external: true'
makedirs(path.join(constants.WORKDIR, 'grafana'), exist_ok=True)
self.grafana_piece = constants.GRAFANA_COMPOSE.format(volume=constants.GRAFANA_VOLUME_NAME,
password=self.data['grafana_password'],
host=self.data['dns'])
def seed_influx_dbs(self):
self._seed_data_to_influx(constants.DATASOURCES_HOST.format(host=self.data['dns']), constants.DATASOURCES)
def prepare_postgres(self):
dirname = "postgres"
self.client.volumes.create(constants.POSTGRES_DB_VOLUME, labels={"carrier": "postgres"})
self.volumes_piece += f"\n {constants.POSTGRES_DB_VOLUME}:\n external: true"
makedirs(path.join(constants.WORKDIR, dirname), exist_ok=True)
with open(path.join(constants.WORKDIR, dirname, "Dockerfile"), "w") as f:
f.write(constants.POSTGRESFILE)
with open(path.join(constants.WORKDIR, dirname, constants.POSTGRES_ENTRYPOINT_FILENAME), "w") as f:
f.write(constants.POSTGRES_ENTRYPOINT)
self.postgres_piece = constants.POSTGRES_COMPOSE.format(
path=path.join(constants.WORKDIR, dirname),
volume=constants.POSTGRES_DB_VOLUME
)
def prepare_influx(self):
self.client.volumes.create(constants.INFLUX_VOLUME_NAME, labels={"carrier": "influx"})
self.volumes_piece += f'\n {constants.INFLUX_VOLUME_NAME}:\n external: true'
makedirs(path.join(constants.WORKDIR, 'influx'), exist_ok=True)
with open(path.join(constants.WORKDIR, 'influx', "Dockerfile"), "w") as f:
f.write(constants.INFLUXFILE)
with open(path.join(constants.WORKDIR, 'influx', "influxdb.conf"), "w") as f:
f.write(constants.INFLUX_CONF)
self.influx_piece = constants.INFLUX_COMPOSE.format(path=path.join(constants.WORKDIR, 'influx'),
volume=constants.INFLUX_VOLUME_NAME)
def create_databases(self):
influx_container = self.client.containers.list(filters={"label": "carrier=influx"})[0]
for db in constants.INFLUX_DATABASES:
influx_container.exec_run(constants.INFLUX_CREATEDB_COMMAND.format(db=db))