Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def getDegaraOnsets(XAudio, Fs, hopSize):
X = essentia.array(XAudio)
b = BeatTrackerDegara()
beats = b(X)
tempo = 60/np.mean(beats[1::] - beats[0:-1])
beats = np.array(np.round(beats*Fs/hopSize), dtype=np.int64)
return (tempo, beats)
def double_beats(beats):
"""Double the beats."""
new_beats = []
for i, beat in enumerate(beats[:-1]):
new_beats.append(beat)
new_beats.append((beat + beats[i + 1]) / 2.0)
return essentia.array(new_beats)
# If already precomputed, read and return
if os.path.exists(features_file):
with open(features_file, "r") as f:
features = json.load(f)
return list_to_array(features)
# Load Audio
logging.info("Loading audio file %s" % os.path.basename(audio_file))
audio = ES.MonoLoader(filename=audio_file, sampleRate=SAMPLE_RATE)()
duration = len(audio) / float(SAMPLE_RATE)
# Estimate Beats
features = {}
ticks, conf = compute_beats(audio)
ticks = np.concatenate(([0], ticks, [duration])) # Add first and last time
ticks = essentia.array(np.unique(ticks))
features["beats"] = ticks.tolist()
# Compute Beat-sync features
features["mfcc"], features["hpcp"], features["tonnetz"] = \
compute_beatsync_features(ticks, audio)
# Save output as audio file
if audio_beats:
logging.info("Saving Beats as an audio file")
marker = ES.AudioOnsetsMarker(onsets=ticks, type='beep',
sampleRate=SAMPLE_RATE)
marked_audio = marker(audio)
ES.MonoWriter(filename='beats.wav',
sampleRate=SAMPLE_RATE)(marked_audio)
# Save features
marked_audio = marker(audio)
ES.MonoWriter(filename='beats.wav',
sampleRate=msaf.Anal.sample_rate)(marked_audio)
# Read annotations if they exist in path/references_dir/file.jams
if os.path.isfile(file_struct.ref_file):
jam = jams2.load(file_struct.ref_file)
# If beat annotations exist, compute also annotated beatsyn features
if jam.beats != []:
logging.info("Reading beat annotations from JAMS")
annot = jam.beats[0]
annot_beats = []
for data in annot.data:
annot_beats.append(data.time.value)
annot_beats = essentia.array(np.unique(annot_beats).tolist())
annot_mfcc, annot_hpcp, annot_tonnetz = compute_features(
audio, annot_beats)
# Save output as json file
logging.info("Saving the JSON file in %s" % out_file)
yaml = YamlOutput(filename=out_file, format='json')
pool = essentia.Pool()
pool.add("beats.times", features["beats"])
pool.add("beats.confidence", features["beats_conf"])
pool.set("analysis.sample_rate", msaf.Anal.sample_rate)
pool.set("analysis.frame_rate", msaf.Anal.frame_size)
pool.set("analysis.hop_size", msaf.Anal.hop_size)
pool.set("analysis.window_type", msaf.Anal.window_type)
pool.set("analysis.mfcc_coeff", msaf.Anal.mfcc_coeff)
pool.set("timestamp",
datetime.datetime.today().strftime("%Y/%m/%d %H:%M:%S"))
def getDegaraOnsets(XAudio, Fs, hopSize):
"""
Call Essentia's implementation of Degara's technique
:param XAudio: Numpy array of raw audio samples
:param Fs: Sample rate
:param hopSize: Hop size of each onset function value
:returns (tempo, beats): Average tempo, numpy array
of beat intervals in seconds
"""
from essentia import Pool, array
import essentia.standard as ess
X = array(XAudio)
b = ess.BeatTrackerDegara()
beats = b(X)
tempo = 60/np.mean(beats[1::] - beats[0:-1])
beats = np.array(np.round(beats*Fs/hopSize), dtype=np.int64)
return (tempo, beats)
normalized (string ∈ {none, unitSum, unitMax}, default = unitMax) :
whether to normalize the HPCP vector
referenceFrequency : (real ∈ (0, ∞), default = 440) :
the reference frequency for semitone index calculation, corresponding to A3 [Hz]
sampleRate : (real ∈ (0, ∞), default = 44100) :
the sampling rate of the audio signal [Hz]
numBins : (integer ∈ [12, ∞), default = 12) :
the size of the output HPCP (must be a positive nonzero multiple of 12)
whitening : (boolean (True, False), default = False)
Optional step of computing spectral whitening to the output from speakPeak magnitudes
'''
audio = array(self.audio_vector)
#print audio.shape
frameGenerator = estd.FrameGenerator(audio, frameSize=frameSize, hopSize=hopSize)
window = estd.Windowing(type=windowType)
spectrum = estd.Spectrum()
# Refer http://essentia.upf.edu/documentation/reference/std_SpectralPeaks.html
spectralPeaks = estd.SpectralPeaks(magnitudeThreshold=0,
maxFrequency=maxFrequency,
minFrequency=minFrequency,
maxPeaks=maxPeaks,
orderBy="frequency",
sampleRate=self.fs)
def save_features(key, pool, mfcc, hpcp, tonnetz):
"""Saves the features into the specified pool under the given key."""
[pool.add(key + ".mfcc", essentia.array(mfcc_coeff))
for mfcc_coeff in mfcc]
[pool.add(key + ".hpcp", essentia.array(hpcp_coeff))
for hpcp_coeff in hpcp]
[pool.add(key + ".tonnetz", essentia.array(tonnetz_coeff))
for tonnetz_coeff in tonnetz]
def detect(self, expression_style_note, expression_style_ts):
# loop in notes
for index_note, note in enumerate(expression_style_note):
# check if vibrato is employed on the note
if expression_style_note[index_note, 11]==0:
# convert time to frame number
onset_frame = int(round(note[1]*self.sampleRate))
offset_frame = int(round((note[1]+note[2])*self.sampleRate))
# extract the pitch contour of the note
pc = self.pitch_contour[onset_frame:offset_frame]
# detect vibrato on the note
freq, extent = Vibrato(sampleRate=self.sampleRate)(essentia.array(pc))
# append the note if it's employed of vibrato
if np.count_nonzero(freq)!=0 and np.count_nonzero(extent)!=0:
self.vibrato = np.append(self.vibrato,[expression_style_note[index_note,0:3]],axis=0)
# convert vibrato note to time segment
time_segment = Common.note_2_ts(self.vibrato)
# update the result
expression_style_ts = Common.update_ts(expression_style_ts, time_segment, technique=self.technique)
expression_style_note = Common.update_esn(expression_style_note=expression_style_note,
note_with_expression_style=self.vibrato,
technique=self.technique,
sub_technique=1)
return expression_style_note, expression_style_ts
def save_features(key, pool, mfcc, hpcp, tonnetz):
"""Saves the features into the specified pool under the given key."""
[pool.add(key + ".mfcc", essentia.array(mfcc_coeff))
for mfcc_coeff in mfcc]
[pool.add(key + ".hpcp", essentia.array(hpcp_coeff))
for hpcp_coeff in hpcp]
[pool.add(key + ".tonnetz", essentia.array(tonnetz_coeff))
for tonnetz_coeff in tonnetz]