Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _play_snippet(self, file_path: str, webservice_url: str, volume: int = -1, duration_offset: float = 0, fade_in=False) -> None:
if not self._check_property():
return
if not self.is_coordinator:
sonos_speaker[self.coordinator]._play_snippet(file_path, webservice_url, volume, duration_offset, fade_in)
else:
with self._snippet_queue_lock:
snap = None
volumes = {}
# save all volumes from zone_member
for member in self.zone_group_members:
volumes[member] = sonos_speaker[member].volume
tag = TinyTag.get(file_path)
duration = round(tag.duration) + duration_offset
self.logger.debug("Sonos: TTS track duration offset is: {offset}s".format(offset=duration_offset))
self.logger.debug("Sonos: TTS track duration: {duration}s".format(duration=duration))
file_name = quote(os.path.split(file_path)[1])
snippet_url = "{url}/{file}".format(url=webservice_url, file=file_name)
# was GoogleTTS the last track? do not snapshot
last_station = self.radio_station.lower()
if last_station != "snippet":
snap = Snapshot(self.soco)
snap.snapshot()
time.sleep(0.5)
self.set_stop()
if volume == -1:
volume = self.volume
def start():
global pifm_proc
global playing_file
global start_time
global streaming
streaming = False
json = request.get_json()
file_name = json["file_name"]
freq = json["freq"]
file = TinyTag.get("static/audio/" + file_name, image=True)
radio_text = removeNonAscii(file.title)
station_name = removeNonAscii(file.artist)
# m = subprocess.Popen("./pifmrds -audio " + file_name + " -freq " + freq + " -rt " + radio_text)
# m.wait()
if pifm_proc and not pifm_proc.poll():
print("Killing")
# os.killpg(os.getpgid(pifm_proc.pid), signal.SIGTERM)
subprocess.Popen("sudo killall pifmrds", shell=True)
print("Killed")
pifm_proc = None
cmd = "sox -t mp3 {} -t wav - | sudo ./pifmrds -audio - -freq {} -rt '{}' -ps '{}'".format(file_name, freq, radio_text, station_name)
print("Cmd: {}".format(cmd))
pifm_proc = subprocess.Popen(cmd, shell=True, cwd="static/audio", preexec_fn=os.setsid)
if not running:
return jsonify({
"running": False,
}), 200
if streaming:
return jsonify({
"running": True,
"filename": playing_file,
"name": radio_text,
"time_elapsed": time.time() - start_time,
}), 200
file = playing_file
f = TinyTag.get("static/audio/" + file, image=True)
if not f.title:
f.title = file
img_path = None
img_exists = os.path.isfile("static/img/" + file.split(".")[0] + ".png")
if img_exists:
img_path = file.split(".")[0] + ".png"
return jsonify({
"running": True,
"filename": file,
"name": f.title,
"length": f.duration,
"author": f.artist,
"img": img_path,
def pop_param(name, _default):
if name in sys.argv:
idx = sys.argv.index(name)
sys.argv.pop(idx)
return sys.argv.pop(idx)
return _default
try:
save_image_path = pop_param('--save-image', None)
formatting = pop_param('--format', 'json')
filename = sys.argv[1]
except:
usage()
try:
tag = TinyTag.get(filename, image=save_image_path is not None)
if save_image_path:
image = tag.get_image()
if image:
with open(save_image_path, 'wb') as fh:
fh.write(image)
if formatting == 'json':
print(json.dumps(tag.as_dict()))
elif formatting == 'csv':
print('\n'.join('%s,%s' % (k, v) for k, v in tag.as_dict().items()))
elif formatting == 'tsv':
print('\n'.join('%s\t%s' % (k, v) for k, v in tag.as_dict().items()))
except TinyTagException as e:
sys.stderr.write(str(e))
def duration_tag(mp3path):
"""
Returns the duration of given mp3 file in millisecond
"""
Tag = tinytag.TinyTag.get(mp3path)
return Tag.duration * 1000
def get_tag(self, filename):
try:
return tinytag.TinyTag.get(filename)
except tinytag.TinyTagException as x:
print("Tag error:", x)
print("Occurred when processing file: ", filename.location.encode("ascii", errors="replace"))
return tinytag.TinyTag.get(filename, duration=False)
def getSongInfo(filepath):
try:
tag = TinyTag.get(filepath)
except LookupError:
return Metainfo()
# make sure everthing returned (except length) is a string
for attribute in ['artist','album','title','track']:
if getattr(tag, attribute) is None:
setattr(tag, attribute, '')
return Metainfo(tag.artist, tag.album, tag.title, str(tag.track), tag.duration)
def api_fetchalbumart(self, directory):
_save_and_release_session()
default_folder_image = "../res/img/folder.png"
log.i('Fetching album art for: %s' % directory)
filepath = os.path.join(cherry.config['media.basedir'], directory)
if os.path.isfile(filepath):
# if the given path is a file, try to get the image from ID3
tag = TinyTag.get(filepath, image=True)
image_data = tag.get_image()
if image_data:
log.d('Image found in tag.')
header = {'Content-Type': 'image/jpg', 'Content-Length': len(image_data)}
cherrypy.response.headers.update(header)
return image_data
else:
# if the file does not contain an image, display the image of the
# parent directory
directory = os.path.dirname(directory)
#try getting a cached album art image
b64imgpath = albumArtFilePath(directory)
img_data = self.albumartcache_load(b64imgpath)
if img_data:
cherrypy.response.headers["Content-Length"] = len(img_data)
def _fetch_embedded_image(self, path):
filetypes = ('.mp3',)
max_tries = 3
header, data, resized = None, '', False
try:
files = os.listdir(path)
files = (f for f in files if f.lower().endswith(filetypes))
for count, file_in_dir in enumerate(files, start=1):
if count > max_tries:
break
filepath = os.path.join(path, file_in_dir)
try:
tag = TinyTag.get(filepath, image=True)
image_data = tag.get_image()
except IOError:
continue
if not image_data:
continue
_header, _data = self.resize_image_data(
image_data, (self.IMAGE_SIZE, self.IMAGE_SIZE))
if _data:
header, data, resized = _header, _data, True
break
except OSError:
pass
return header, data, resized