Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _determine_duration(self, fh):
max_estimation_frames = (ID3._MAX_ESTIMATION_SEC*44100) // 1152
frame_size_mean = 0
# set sample rate from first found frame later, default to 44khz
file_sample_rate = 44100
# see this page for the magic values used in mp3:
# http://www.mpgedit.org/mpgedit/mpeg_format/mpeghdr.htm
bitrates = [0, 32, 40, 48, 56, 64, 80, 96, 112, 128, 160, 192,
224, 256, 320]
samplerates = [44100, 48000, 32000]
header_bytes = 4
frames = 0 # count frames for determining mp3 duration
# seek to first position after id3 tag (speedup for large header)
fh.seek(self._bytepos_after_id3v2)
while True:
# reading through garbage until 12 '1' bits are found
b = fh.read(1)
if len(b) == 0:
fh.seek(max(idx, 1), os.SEEK_CUR)
continue
try:
self.channels = self.channels_per_channel_mode[channel_mode]
frame_bitrate = ID3.bitrate_by_version_by_layer[mpeg_id][layer_id][br_id]
self.samplerate = ID3.samplerates[mpeg_id][sr_id]
except (IndexError, TypeError):
raise TinyTagException('mp3 parsing failed')
# There might be a xing header in the first frame that contains
# all the info we need, otherwise parse multiple frames to find the
# accurate average bitrate
if frames == 0 and ID3._USE_XING_HEADER:
xing_header_offset = b.find(b'Xing')
if xing_header_offset != -1:
fh.seek(xing_header_offset, os.SEEK_CUR)
xframes, byte_count, toc, vbr_scale = ID3._parse_xing_header(fh)
if xframes and xframes != 0 and byte_count:
self.duration = xframes * ID3.samples_per_frame / float(self.samplerate)
self.bitrate = byte_count * 8 / self.duration / 1000
self.audio_offset = fh.tell()
return
continue
frames += 1 # it's most probably an mp3 frame
bitrate_accu += frame_bitrate
if frames == 1:
self.audio_offset = fh.tell()
if frames <= ID3._CBR_DETECTION_FRAME_COUNT:
last_bitrates.append(frame_bitrate)
fh.seek(4, os.SEEK_CUR) # jump over peeked bytes
frame_length = (144000 * frame_bitrate) // self.samplerate + padding
def _determine_duration(self, fh):
max_estimation_frames = (ID3._MAX_ESTIMATION_SEC * 44100) // ID3.samples_per_frame
frame_size_accu = 0
header_bytes = 4
frames = 0 # count frames for determining mp3 duration
bitrate_accu = 0 # add up bitrates to find average bitrate to detect
last_bitrates = [] # CBR mp3s (multiple frames with same bitrates)
# seek to first position after id3 tag (speedup for large header)
fh.seek(self._bytepos_after_id3v2)
while True:
# reading through garbage until 11 '1' sync-bits are found
b = fh.peek(4)
if len(b) < 4:
break # EOF
sync, conf, bitrate_freq, rest = struct.unpack('BBBB', b[0:4])
br_id = (bitrate_freq >> 4) & 0x0F # biterate id
sr_id = (bitrate_freq >> 2) & 0x03 # sample rate id
padding = 1 if bitrate_freq & 0x02 > 0 else 0
def file_list():
payload = {}
# p = subprocess.Popen("ls", stdout=subprocess.PIPE, shell=True, cwd="static/audio")
# p.wait()
# files = p.stdout.readlines()
# files = [x.strip() for x in files]
i = 0
for r, d, f in os.walk("static/audio"):
for f2 in f:
try:
file = f2
f = TinyTag.get("static/audio/" + file, image=True)
if not f.title:
f.title = file
img_exists = os.path.isfile("static/img/" + file.split(".")[0] + ".png")
if img_exists:
img_path = file.split(".")[0] + ".png"
else:
img_path = None
payload[i] = {
"filename": file,
"name": f.title,
"length": f.duration,
"author": f.artist,
"img": img_path
}
i += 1
except:
def _play_snippet(self, file_path: str, webservice_url: str, volume: int = -1, duration_offset: float = 0, fade_in=False) -> None:
if not self._check_property():
return
if not self.is_coordinator:
sonos_speaker[self.coordinator]._play_snippet(file_path, webservice_url, volume, duration_offset, fade_in)
else:
with self._snippet_queue_lock:
snap = None
volumes = {}
# save all volumes from zone_member
for member in self.zone_group_members:
volumes[member] = sonos_speaker[member].volume
tag = TinyTag.get(file_path)
duration = round(tag.duration) + duration_offset
self.logger.debug("Sonos: TTS track duration offset is: {offset}s".format(offset=duration_offset))
self.logger.debug("Sonos: TTS track duration: {duration}s".format(duration=duration))
file_name = quote(os.path.split(file_path)[1])
snippet_url = "{url}/{file}".format(url=webservice_url, file=file_name)
# was GoogleTTS the last track? do not snapshot
last_station = self.radio_station.lower()
if last_station != "snippet":
snap = Snapshot(self.soco)
snap.snapshot()
time.sleep(0.5)
self.set_stop()
if volume == -1:
volume = self.volume
is_info = fh.read(4) # check INFO header
if is_info != b'INFO': # jump over non-INFO sections
fh.seek(subchunksize - 4, os.SEEK_CUR)
else:
sub_fh = BytesIO(fh.read(subchunksize - 4))
field = sub_fh.read(4)
while len(field):
data_length = struct.unpack('I', sub_fh.read(4))[0]
data = sub_fh.read(data_length).split(b'\x00', 1)[0] # strip zero-byte
data = codecs.decode(data, 'utf-8')
fieldname = self.riff_mapping.get(field)
if fieldname:
self._set_field(fieldname, data)
field = sub_fh.read(4)
elif subchunkid == b'id3 ' or subchunkid == b'ID3 ':
id3 = ID3(fh, 0)
id3._parse_id3v2(fh)
self.update(id3)
else: # some other chunk, just skip the data
fh.seek(subchunksize, 1)
chunk_header = fh.read(8)
self._duration_parsed = True
def start():
global pifm_proc
global playing_file
global start_time
global streaming
streaming = False
json = request.get_json()
file_name = json["file_name"]
freq = json["freq"]
file = TinyTag.get("static/audio/" + file_name, image=True)
radio_text = removeNonAscii(file.title)
station_name = removeNonAscii(file.artist)
# m = subprocess.Popen("./pifmrds -audio " + file_name + " -freq " + freq + " -rt " + radio_text)
# m.wait()
if pifm_proc and not pifm_proc.poll():
print("Killing")
# os.killpg(os.getpgid(pifm_proc.pid), signal.SIGTERM)
subprocess.Popen("sudo killall pifmrds", shell=True)
print("Killed")
pifm_proc = None
cmd = "sox -t mp3 {} -t wav - | sudo ./pifmrds -audio - -freq {} -rt '{}' -ps '{}'".format(file_name, freq, radio_text, station_name)
print("Cmd: {}".format(cmd))
pifm_proc = subprocess.Popen(cmd, shell=True, cwd="static/audio", preexec_fn=os.setsid)
if not running:
return jsonify({
"running": False,
}), 200
if streaming:
return jsonify({
"running": True,
"filename": playing_file,
"name": radio_text,
"time_elapsed": time.time() - start_time,
}), 200
file = playing_file
f = TinyTag.get("static/audio/" + file, image=True)
if not f.title:
f.title = file
img_path = None
img_exists = os.path.isfile("static/img/" + file.split(".")[0] + ".png")
if img_exists:
img_path = file.split(".")[0] + ".png"
return jsonify({
"running": True,
"filename": file,
"name": f.title,
"length": f.duration,
"author": f.artist,
"img": img_path,
def _set_field(self, fieldname, bytestring, transfunc=None):
"""convienience function to set fields of the tinytag by name.
the payload (bytestring) can be changed using the transfunc"""
if getattr(self, fieldname): # do not overwrite existing data
return
value = bytestring if transfunc is None else transfunc(bytestring)
if DEBUG:
stderr('Setting field "%s" to "%s"' % (fieldname, value))
if fieldname == 'genre' and value.isdigit() and int(value) < len(ID3.ID3V1_GENRES):
# funky: id3v1 genre hidden in a id3v2 field
value = ID3.ID3V1_GENRES[int(value)]
if fieldname in ("track", "disc"):
if type(value).__name__ in ('str', 'unicode') and '/' in value:
current, total = value.split('/')[:2]
setattr(self, "%s_total" % fieldname, total)
else:
current = value
setattr(self, fieldname, current)
else:
setattr(self, fieldname, value)
def _set_field(self, fieldname, bytestring, transfunc=None):
"""convienience function to set fields of the tinytag by name.
the payload (bytestring) can be changed using the transfunc"""
if getattr(self, fieldname): # do not overwrite existing data
return
value = bytestring if transfunc is None else transfunc(bytestring)
if DEBUG:
stderr('Setting field "%s" to "%s"' % (fieldname, value))
if fieldname == 'genre' and value.isdigit() and int(value) < len(ID3.ID3V1_GENRES):
# funky: id3v1 genre hidden in a id3v2 field
value = ID3.ID3V1_GENRES[int(value)]
if fieldname in ("track", "disc"):
if type(value).__name__ in ('str', 'unicode') and '/' in value:
current, total = value.split('/')[:2]
setattr(self, "%s_total" % fieldname, total)
else:
current = value
setattr(self, fieldname, current)
else:
setattr(self, fieldname, value)