Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def TemporaryZooKeeper():
host = 'localhost:2181'
path = 'pgshovel-test-%s' % (uuid.uuid1().hex,)
zookeeper = KazooClient(host)
zookeeper.start()
zookeeper.create(path)
zookeeper.stop()
yield '/'.join((host, path))
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import json
import config
from global_var import *
from blogging import logger
import config
from kazoo.client import KazooClient
from kazoo.protocol.paths import join
#from kazoo.exceptions import (KazooException, NoNodeException)
zk_client = KazooClient(hosts=config.zk_hosts)
zk_client.start()
#zk_client.add_auth("digest", "test:test")
def getRack():
try:
def watcher(event):
logger.info("/rack children changed, need update memory")
getRack()
zk_client.get('/rack', watcher)
children = zk_client.get_children('/rack')
for child in children:
rack_name = child.encode('utf-8')
RACK_STORE[rack_name] = []
path1 = join('/rack', rack_name)
@classmethod
def __enter__(cls):
if cls.zk is None:
cls.zk = KazooClient(hosts=load_system_paasta_config().get_zk_hosts(), read_only=True)
cls.zk.start()
cls.counter = cls.counter + 1
return cls.zk
def _get_zk_conn(hosts):
global ZK_CONNECTION
if ZK_CONNECTION is None:
ZK_CONNECTION = KazooClient(hosts=hosts)
ZK_CONNECTION.start()
return ZK_CONNECTION
max_tries=3,
delay=0.3,
backoff=1,
max_delay=1,
ignore_expire=False,
)
default_acl = None
auth_data = None
if zk_user and zk_secret:
default_acl = [make_digest_acl(zk_user, zk_secret, all=True)]
scheme = 'digest'
credential = "{}:{}".format(zk_user, zk_secret)
auth_data = [(scheme, credential)]
zk = KazooClient(
hosts=zk_addr,
timeout=30,
connection_retry=conn_retry_policy,
command_retry=cmd_retry_policy,
default_acl=default_acl,
auth_data=auth_data,
)
zk.start()
return zk
def start_zk_client(opts):
zk_hosts = opts.zk_hosts
if not opts.quiet:
sys.stdout.write("connecting to "+zk_hosts)
gdata["zk_client"] = KazooClient(hosts=zk_hosts)
gdata["zk_client"].start()
res = "SUCCEEDED" if gdata["zk_client"].connected else "FAILED"
if not opts.quiet:
print " [{res}]".format(**locals())
from kazoo.client import KazooClient
from kazoo.exceptions import NoNodeError, ConnectionLoss
class ExtendedKazooClient(KazooClient):
def __enter__(self):
self.start()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.stop()
def command(self, cmd='ruok'):
"""Sends a commmand to the ZK node.
Overrides methode defined at
https://github.com/python-zk/kazoo/blob/release/2.4/kazoo/client.py#L637
as it could leave some data unread from the socket.
"""
def create_from_zookeeper(cls, zkconnect, default_retention=1, fetch_topics=True):
log.info("Connecting to zookeeper {0}".format(zkconnect))
try:
zk = KazooClient(zkconnect)
zk.start()
except Exception as e:
raise ZookeeperException("Cannot connect to Zookeeper: {0}".format(e))
# Get broker list
cluster = cls(retention=default_retention)
add_brokers_from_zk(cluster, zk)
# Get current partition state
if fetch_topics:
log.info("Getting partition list from Zookeeper")
for topic in zk.get_children("/brokers/topics"):
zdata, zstat = zk.get("/brokers/topics/{0}".format(topic))
add_topic_with_replicas(cluster, topic, json_loads(zdata))
set_topic_retention(cluster.topics[topic], zk)
def __init__(self,args):
if args.secure:
auth_token = kerberosWrapper.krb_wrapper(args.principal, args.keytab,args.cache_file)
os.environ['KRB5CCNAME'] = args.cache_file
self.zkserver = args.hosts
self.zk_client = args.zk_client
self.test = args.test
self.hdfs_cluster_name=args.hdfs_cluster_name
self.tests = {'hdfs' : self.check_hdfs,
'hbase' : self.check_hbase,
'kafka' : self.check_kafka
}
self.check_topics=args.check_topics
self.topic=args.topic
if args.secure and auth_token: auth_token.destroy()
self.zk = KazooClient(hosts=self.zkserver)
self.zk.start()
my_ip = appscale_info.get_private_ip()
is_master = (my_ip == appscale_info.get_headnode_ip())
is_lb = (my_ip in appscale_info.get_load_balancer_ips())
is_tq = (my_ip in appscale_info.get_taskqueue_nodes())
if is_master:
# Periodically check with the portal for new tasks.
# Note: Currently, any active handlers from the tornado app will block
# polling until they complete.
PeriodicCallback(poll, constants.POLLING_INTERVAL).start()
# Only master Hermes node handles /do_task route
task_route = ('/do_task', TaskHandler)
global zk_client
zk_client = KazooClient(
hosts=','.join(appscale_info.get_zk_node_ips()),
connection_retry=ZK_PERSISTENT_RECONNECTS)
zk_client.start()
# Start watching profiling configs in ZooKeeper
stats_app.ProfilingManager(zk_client)
# Periodically checks if the deployment is registered and uploads the
# appscalesensor app for registered deployments.
sensor_deployer = SensorDeployer(zk_client)
PeriodicCallback(sensor_deployer.deploy,
constants.UPLOAD_SENSOR_INTERVAL).start()
else:
task_route = ('/do_task', Respond404Handler,
dict(reason='Hermes slaves do not manage tasks from Portal'))
app = tornado.web.Application([