diff --git a/augmentation/spec_stretch.py b/augmentation/spec_stretch.py index 5d90518ec..6f1a394ef 100644 --- a/augmentation/spec_stretch.py +++ b/augmentation/spec_stretch.py @@ -48,7 +48,9 @@ def process_item(self, item: dict, key_shift=0., speed=1., replace_spk_id=None) ).cpu().numpy() f0, _ = self.pe.get_pitch( - wav, aug_item['length'], hparams, speed=speed, interp_uv=hparams['interp_uv'] + wav, samplerate=hparams['audio_sample_rate'], length=aug_item['length'], + hop_size=hparams['hop_size'], f0_min=hparams['f0_min'], f0_max=hparams['f0_max'], + speed=speed, interp_uv=hparams['interp_uv'] ) aug_item['f0'] = f0.astype(np.float32) diff --git a/basics/base_pe.py b/basics/base_pe.py index c5d7a68d8..78179a033 100644 --- a/basics/base_pe.py +++ b/basics/base_pe.py @@ -1,3 +1,7 @@ class BasePE: - def get_pitch(self, waveform, length, hparams, interp_uv=False, speed=1): + def get_pitch( + self, waveform, samplerate, length, + *, hop_size, f0_min=65, f0_max=1100, + speed=1, interp_uv=False + ): raise NotImplementedError() diff --git a/configs/acoustic.yaml b/configs/acoustic.yaml index f32b878e3..b9f4e1a75 100644 --- a/configs/acoustic.yaml +++ b/configs/acoustic.yaml @@ -54,11 +54,13 @@ mel_vmax: 1.5 interp_uv: true energy_smooth_width: 0.12 breathiness_smooth_width: 0.12 +tension_smooth_width: 0.12 use_spk_id: false f0_embed_type: continuous use_energy_embed: false use_breathiness_embed: false +use_tension_embed: false use_key_shift_embed: false use_speed_embed: false diff --git a/configs/variance.yaml b/configs/variance.yaml index 1951f685e..efbb6c79a 100644 --- a/configs/variance.yaml +++ b/configs/variance.yaml @@ -40,6 +40,7 @@ predict_dur: true predict_pitch: true predict_energy: false predict_breathiness: false +predict_tension: false dur_prediction_args: arch: fs2 @@ -77,6 +78,9 @@ energy_smooth_width: 0.12 breathiness_db_min: -96.0 breathiness_db_max: -20.0 breathiness_smooth_width: 0.12 +tension_logit_min: -10.0 +tension_logit_max: 10.0 +tension_smooth_width: 0.12 variances_prediction_args: total_repeat_bins: 48 diff --git a/inference/ds_acoustic.py b/inference/ds_acoustic.py index b3254046e..1f828c9e7 100644 --- a/inference/ds_acoustic.py +++ b/inference/ds_acoustic.py @@ -32,6 +32,8 @@ def __init__(self, device=None, load_model=True, load_vocoder=True, ckpt_steps=N self.variances_to_embed.add('energy') if hparams.get('use_breathiness_embed', False): self.variances_to_embed.add('breathiness') + if hparams.get('use_tension_embed', False): + self.variances_to_embed.add('tension') self.ph_encoder = TokenTextEncoder(vocab_list=build_phoneme_list()) if hparams['use_spk_id']: diff --git a/inference/val_nsf_hifigan.py b/inference/val_nsf_hifigan.py index b754ab69a..290b6e6ef 100644 --- a/inference/val_nsf_hifigan.py +++ b/inference/val_nsf_hifigan.py @@ -61,7 +61,10 @@ def get_pitch(wav_data, mel, hparams, threshold=0.3): if not filename.endswith('.wav'): continue wav, mel = vocoder.wav2spec(os.path.join(in_path, filename)) - f0, _ = get_pitch_parselmouth(wav, len(mel), hparams) + f0, _ = get_pitch_parselmouth( + wav, samplerate=hparams['audio_sample_rate'], length=len(mel), + hop_size=hparams['hop_size'] + ) wav_out = vocoder.spec2wav(mel, f0=f0) save_wav(wav_out, os.path.join(out_path, filename), hparams['audio_sample_rate']) diff --git a/modules/fastspeech/acoustic_encoder.py b/modules/fastspeech/acoustic_encoder.py index 61b04a4fa..a9d4d849f 100644 --- a/modules/fastspeech/acoustic_encoder.py +++ b/modules/fastspeech/acoustic_encoder.py @@ -36,10 +36,13 @@ def __init__(self, vocab_size): self.variance_embed_list = [] self.use_energy_embed = hparams.get('use_energy_embed', False) self.use_breathiness_embed = hparams.get('use_breathiness_embed', False) + self.use_tension_embed = hparams.get('use_tension_embed', False) if self.use_energy_embed: self.variance_embed_list.append('energy') if self.use_breathiness_embed: self.variance_embed_list.append('breathiness') + if self.use_tension_embed: + self.variance_embed_list.append('tension') self.use_variance_embeds = len(self.variance_embed_list) > 0 if self.use_variance_embeds: diff --git a/modules/fastspeech/param_adaptor.py b/modules/fastspeech/param_adaptor.py index 7d905066e..e257186ab 100644 --- a/modules/fastspeech/param_adaptor.py +++ b/modules/fastspeech/param_adaptor.py @@ -5,7 +5,7 @@ from modules.diffusion.ddpm import MultiVarianceDiffusion from utils.hparams import hparams -VARIANCE_CHECKLIST = ['energy', 'breathiness'] +VARIANCE_CHECKLIST = ['energy', 'breathiness', 'tension'] class ParameterAdaptorModule(torch.nn.Module): @@ -14,10 +14,13 @@ def __init__(self): self.variance_prediction_list = [] self.predict_energy = hparams.get('predict_energy', False) self.predict_breathiness = hparams.get('predict_breathiness', False) + self.predict_tension = hparams.get('predict_tension', False) if self.predict_energy: self.variance_prediction_list.append('energy') if self.predict_breathiness: self.variance_prediction_list.append('breathiness') + if self.predict_tension: + self.variance_prediction_list.append('tension') self.predict_variances = len(self.variance_prediction_list) > 0 def build_adaptor(self, cls=MultiVarianceDiffusion): @@ -38,6 +41,16 @@ def build_adaptor(self, cls=MultiVarianceDiffusion): )) clamps.append((hparams['breathiness_db_min'], 0.)) + if self.predict_tension: + ranges.append(( + hparams['tension_logit_min'], + hparams['tension_logit_max'] + )) + clamps.append(( + hparams['tension_logit_min'], + hparams['tension_logit_max'] + )) + variances_hparams = hparams['variances_prediction_args'] total_repeat_bins = variances_hparams['total_repeat_bins'] assert total_repeat_bins % len(self.variance_prediction_list) == 0, \ diff --git a/modules/pe/pm.py b/modules/pe/pm.py index dc71b3398..d7e868907 100644 --- a/modules/pe/pm.py +++ b/modules/pe/pm.py @@ -3,5 +3,12 @@ class ParselmouthPE(BasePE): - def get_pitch(self, waveform, length, hparams, interp_uv=False, speed=1): - return get_pitch_parselmouth(waveform, length, hparams, speed=speed, interp_uv=interp_uv) + def get_pitch( + self,waveform, samplerate, length, + *, hop_size, f0_min=65, f0_max=1100, + speed=1, interp_uv=False + ): + return get_pitch_parselmouth( + waveform, samplerate=samplerate, length=length, + hop_size=hop_size, speed=speed, interp_uv=interp_uv + ) diff --git a/modules/pe/pw.py b/modules/pe/pw.py index 015b458ab..6fe629f80 100644 --- a/modules/pe/pw.py +++ b/modules/pe/pw.py @@ -3,15 +3,20 @@ import pyworld as pw from utils.pitch_utils import interp_f0 -class HarvestPE(BasePE): - def get_pitch(self, waveform, length, hparams, interp_uv=False, speed=1): - hop_size = int(np.round(hparams['hop_size'] * speed)) - time_step = 1000 * hop_size / hparams['audio_sample_rate'] - f0_floor = hparams['f0_min'] - f0_ceil = hparams['f0_max'] +class HarvestPE(BasePE): + def get_pitch( + self, waveform, samplerate, length, + *, hop_size, f0_min=65, f0_max=1100, + speed=1, interp_uv=False + ): + hop_size = int(np.round(hop_size * speed)) + time_step = 1000 * hop_size / samplerate - f0, _ = pw.harvest(waveform.astype(np.float64), hparams['audio_sample_rate'], f0_floor=f0_floor, f0_ceil=f0_ceil, frame_period=time_step) + f0, _ = pw.harvest( + waveform.astype(np.float64), samplerate, + f0_floor=f0_min, f0_ceil=f0_max, frame_period=time_step + ) f0 = f0.astype(np.float32) if f0.size < length: @@ -22,4 +27,3 @@ def get_pitch(self, waveform, length, hparams, interp_uv=False, speed=1): if interp_uv: f0, uv = interp_f0(f0, uv) return f0, uv - \ No newline at end of file diff --git a/modules/pe/rmvpe/inference.py b/modules/pe/rmvpe/inference.py index 49b785c53..816a3582b 100644 --- a/modules/pe/rmvpe/inference.py +++ b/modules/pe/rmvpe/inference.py @@ -52,13 +52,17 @@ def infer_from_audio(self, audio, sample_rate=16000, thred=0.03, use_viterbi=Fal f0 = self.decode(hidden, thred=thred, use_viterbi=use_viterbi) return f0 - def get_pitch(self, waveform, length, hparams, interp_uv=False, speed=1): - f0 = self.infer_from_audio(waveform, sample_rate=hparams['audio_sample_rate']) + def get_pitch( + self, waveform, samplerate, length, + *, hop_size, f0_min=65, f0_max=1100, + speed=1, interp_uv=False + ): + f0 = self.infer_from_audio(waveform, sample_rate=samplerate) uv = f0 == 0 f0, uv = interp_f0(f0, uv) - hop_size = int(np.round(hparams['hop_size'] * speed)) - time_step = hop_size / hparams['audio_sample_rate'] + hop_size = int(np.round(hop_size * speed)) + time_step = hop_size / samplerate f0_res = resample_align_curve(f0, 0.01, time_step, length) uv_res = resample_align_curve(uv.astype(np.float32), 0.01, time_step, length) > 0.5 if not interp_uv: diff --git a/preprocessing/acoustic_binarizer.py b/preprocessing/acoustic_binarizer.py index c6cf48b24..8c81abd52 100644 --- a/preprocessing/acoustic_binarizer.py +++ b/preprocessing/acoustic_binarizer.py @@ -21,10 +21,12 @@ from modules.pe import initialize_pe from modules.vocoders.registry import VOCODERS from utils.binarizer_utils import ( + DecomposedWaveform, SinusoidalSmoothingConv1d, get_mel2ph_torch, get_energy_librosa, - get_breathiness_pyworld + get_breathiness_pyworld, + get_tension_base_harmonic, ) from utils.hparams import hparams @@ -37,21 +39,24 @@ 'f0', 'energy', 'breathiness', + 'tension', 'key_shift', - 'speed' + 'speed', ] pitch_extractor: BasePE = None energy_smooth: SinusoidalSmoothingConv1d = None breathiness_smooth: SinusoidalSmoothingConv1d = None +tension_smooth: SinusoidalSmoothingConv1d = None class AcousticBinarizer(BaseBinarizer): def __init__(self): super().__init__(data_attrs=ACOUSTIC_ITEM_ATTRIBUTES) self.lr = LengthRegulator() - self.need_energy = hparams.get('use_energy_embed', False) - self.need_breathiness = hparams.get('use_breathiness_embed', False) + self.need_energy = hparams['use_energy_embed'] + self.need_breathiness = hparams['use_breathiness_embed'] + self.need_tension = hparams['use_tension_embed'] def load_meta_data(self, raw_data_dir: pathlib.Path, ds_id, spk_id): meta_data_dict = {} @@ -108,7 +113,9 @@ def process_item(self, item_name, meta_data, binarization_args): if pitch_extractor is None: pitch_extractor = initialize_pe() gt_f0, uv = pitch_extractor.get_pitch( - wav, length, hparams, interp_uv=hparams['interp_uv'] + wav, samplerate=hparams['audio_sample_rate'], length=length, + hop_size=hparams['hop_size'], f0_min=hparams['f0_min'], f0_max=hparams['f0_max'], + interp_uv=hparams['interp_uv'] ) if uv.all(): # All unvoiced print(f'Skipped \'{item_name}\': empty gt f0') @@ -117,7 +124,9 @@ def process_item(self, item_name, meta_data, binarization_args): if self.need_energy: # get ground truth energy - energy = get_energy_librosa(wav, length, hparams).astype(np.float32) + energy = get_energy_librosa( + wav, length, hop_size=hparams['hop_size'], win_size=hparams['win_size'] + ).astype(np.float32) global energy_smooth if energy_smooth is None: @@ -128,9 +137,17 @@ def process_item(self, item_name, meta_data, binarization_args): processed_input['energy'] = energy.cpu().numpy() + # create a DeconstructedWaveform object for further feature extraction + dec_waveform = DecomposedWaveform( + wav, samplerate=hparams['audio_sample_rate'], f0=gt_f0 * ~uv, + hop_size=hparams['hop_size'], fft_size=hparams['fft_size'], win_size=hparams['win_size'] + ) + if self.need_breathiness: # get ground truth breathiness - breathiness = get_breathiness_pyworld(wav, gt_f0 * ~uv, length, hparams).astype(np.float32) + breathiness = get_breathiness_pyworld( + dec_waveform, None, None, length=length + ) global breathiness_smooth if breathiness_smooth is None: @@ -141,6 +158,25 @@ def process_item(self, item_name, meta_data, binarization_args): processed_input['breathiness'] = breathiness.cpu().numpy() + if self.need_tension: + # get ground truth tension + tension = get_tension_base_harmonic( + dec_waveform, None, None, length=length, domain='logit' + ) + + global tension_smooth + if tension_smooth is None: + tension_smooth = SinusoidalSmoothingConv1d( + round(hparams['tension_smooth_width'] / self.timestep) + ).eval().to(self.device) + tension = tension_smooth(torch.from_numpy(tension).to(self.device)[None])[0] + if tension.isnan().any(): + print('Error:', item_name) + print(tension) + return None + + processed_input['tension'] = tension.cpu().numpy() + if hparams.get('use_key_shift_embed', False): processed_input['key_shift'] = 0. diff --git a/preprocessing/variance_binarizer.py b/preprocessing/variance_binarizer.py index ba6b831b1..3478b508f 100644 --- a/preprocessing/variance_binarizer.py +++ b/preprocessing/variance_binarizer.py @@ -14,10 +14,12 @@ from modules.fastspeech.tts_modules import LengthRegulator from modules.pe import initialize_pe from utils.binarizer_utils import ( + DecomposedWaveform, SinusoidalSmoothingConv1d, get_mel2ph_torch, get_energy_librosa, - get_breathiness_pyworld + get_breathiness_pyworld, + get_tension_base_harmonic, ) from utils.hparams import hparams from utils.infer_utils import resample_align_curve @@ -42,6 +44,7 @@ 'uv', # unvoiced masks (only for objective evaluation metrics), bool[T_s,] 'energy', # frame-level RMS (dB), float32[T_s,] 'breathiness', # frame-level RMS of aperiodic parts (dB), float32[T_s,] + 'tension', # frame-level tension (logit), float32[T_s,] ] DS_INDEX_SEP = '#' @@ -51,6 +54,7 @@ midi_smooth: SinusoidalSmoothingConv1d = None energy_smooth: SinusoidalSmoothingConv1d = None breathiness_smooth: SinusoidalSmoothingConv1d = None +tension_smooth: SinusoidalSmoothingConv1d = None class VarianceBinarizer(BaseBinarizer): @@ -70,7 +74,8 @@ def __init__(self): predict_energy = hparams['predict_energy'] predict_breathiness = hparams['predict_breathiness'] - self.predict_variances = predict_energy or predict_breathiness + predict_tension = hparams['predict_tension'] + self.predict_variances = predict_energy or predict_breathiness or predict_tension self.lr = LengthRegulator().to(self.device) self.prefer_ds = self.binarization_args['prefer_ds'] self.cached_ds = {} @@ -275,7 +280,11 @@ def process_item(self, item_name, meta_data, binarization_args): uv = f0 == 0 f0, _ = interp_f0(f0, uv) if f0 is None: - f0, uv = pitch_extractor.get_pitch(waveform, length, hparams, interp_uv=True) + f0, uv = pitch_extractor.get_pitch( + waveform, samplerate=hparams['audio_sample_rate'], length=length, + hop_size=hparams['hop_size'], f0_min=hparams['f0_min'], f0_max=hparams['f0_max'], + interp_uv=True + ) if uv.all(): # All unvoiced print(f'Skipped \'{item_name}\': empty gt f0') return None @@ -355,7 +364,10 @@ def process_item(self, item_name, meta_data, binarization_args): align_length=length ) if energy is None: - energy = get_energy_librosa(waveform, length, hparams).astype(np.float32) + energy = get_energy_librosa( + waveform, length, + hop_size=hparams['hop_size'], win_size=hparams['win_size'] + ).astype(np.float32) energy_from_wav = True if energy_from_wav: @@ -368,6 +380,12 @@ def process_item(self, item_name, meta_data, binarization_args): processed_input['energy'] = energy + # create a DeconstructedWaveform object for further feature extraction + dec_waveform = DecomposedWaveform( + waveform, samplerate=hparams['audio_sample_rate'], f0=f0 * ~uv, + hop_size=hparams['hop_size'], fft_size=hparams['fft_size'], win_size=hparams['win_size'] + ) if waveform is not None else None + # Below: extract breathiness if hparams['predict_breathiness']: breathiness = None @@ -384,7 +402,9 @@ def process_item(self, item_name, meta_data, binarization_args): align_length=length ) if breathiness is None: - breathiness = get_breathiness_pyworld(waveform, f0 * ~uv, length, hparams).astype(np.float32) + breathiness = get_breathiness_pyworld( + dec_waveform, None, None, length=length + ) breathiness_from_wav = True if breathiness_from_wav: @@ -397,6 +417,37 @@ def process_item(self, item_name, meta_data, binarization_args): processed_input['breathiness'] = breathiness + # Below: extract tension + if hparams['predict_tension']: + tension = None + tension_from_wav = False + if self.prefer_ds: + tension_seq = self.load_attr_from_ds(ds_id, name, 'tension', idx=ds_seg_idx) + if tension_seq is not None: + tension = resample_align_curve( + np.array(tension_seq.split(), np.float32), + original_timestep=float(self.load_attr_from_ds( + ds_id, name, 'tension_timestep', idx=ds_seg_idx + )), + target_timestep=self.timestep, + align_length=length + ) + if tension is None: + tension = get_tension_base_harmonic( + dec_waveform, None, None, length=length, domain='logit' + ) + tension_from_wav = True + + if tension_from_wav: + global tension_smooth + if tension_smooth is None: + tension_smooth = SinusoidalSmoothingConv1d( + round(hparams['tension_smooth_width'] / self.timestep) + ).eval().to(self.device) + tension = tension_smooth(torch.from_numpy(tension).to(self.device)[None])[0].cpu().numpy() + + processed_input['tension'] = tension + return processed_input def arrange_data_augmentation(self, data_iterator): diff --git a/training/acoustic_task.py b/training/acoustic_task.py index 68cb5f9a4..c903c9623 100644 --- a/training/acoustic_task.py +++ b/training/acoustic_task.py @@ -23,13 +23,15 @@ class AcousticDataset(BaseDataset): def __init__(self, prefix, preload=False): super(AcousticDataset, self).__init__(prefix, hparams['dataset_size_key'], preload) self.required_variances = {} # key: variance name, value: padding value - if hparams.get('use_energy_embed', False): + if hparams['use_energy_embed']: self.required_variances['energy'] = 0.0 - if hparams.get('use_breathiness_embed', False): + if hparams['use_breathiness_embed']: self.required_variances['breathiness'] = 0.0 + if hparams['use_tension_embed']: + self.required_variances['tension'] = 0.0 - self.need_key_shift = hparams.get('use_key_shift_embed', False) - self.need_speed = hparams.get('use_speed_embed', False) + self.need_key_shift = hparams['use_key_shift_embed'] + self.need_speed = hparams['use_speed_embed'] self.need_spk_id = hparams['use_spk_id'] def collater(self, samples): @@ -74,10 +76,12 @@ def __init__(self): self.vocoder: BaseVocoder = get_vocoder_cls(hparams)() self.logged_gt_wav = set() self.required_variances = [] - if hparams.get('use_energy_embed', False): + if hparams['use_energy_embed']: self.required_variances.append('energy') - if hparams.get('use_breathiness_embed', False): + if hparams['use_breathiness_embed']: self.required_variances.append('breathiness') + if hparams['use_tension_embed']: + self.required_variances.append('tension') super()._finish_init() def _build_model(self): @@ -163,7 +167,6 @@ def _validation_step(self, sample, batch_idx): self.plot_mel(data_idx, sample['mel'][i], mel_out.diff_out[i], 'diffmel') return losses, sample['size'] - ############ # validation plots ############ diff --git a/training/variance_task.py b/training/variance_task.py index 6ae7bed1d..69fc21207 100644 --- a/training/variance_task.py +++ b/training/variance_task.py @@ -24,7 +24,8 @@ def __init__(self, prefix, preload=False): super(VarianceDataset, self).__init__(prefix, hparams['dataset_size_key'], preload) need_energy = hparams['predict_energy'] need_breathiness = hparams['predict_breathiness'] - self.predict_variances = need_energy or need_breathiness + need_tension = hparams['predict_tension'] + self.predict_variances = need_energy or need_breathiness or need_tension def collater(self, samples): batch = super().collater(samples) @@ -59,6 +60,8 @@ def collater(self, samples): batch['energy'] = utils.collate_nd([s['energy'] for s in samples], 0) if hparams['predict_breathiness']: batch['breathiness'] = utils.collate_nd([s['breathiness'] for s in samples], 0) + if hparams['predict_tension']: + batch['tension'] = utils.collate_nd([s['tension'] for s in samples], 0) return batch @@ -89,11 +92,14 @@ def __init__(self): predict_energy = hparams['predict_energy'] predict_breathiness = hparams['predict_breathiness'] + predict_tension = hparams['predict_tension'] self.variance_prediction_list = [] if predict_energy: self.variance_prediction_list.append('energy') if predict_breathiness: self.variance_prediction_list.append('breathiness') + if predict_tension: + self.variance_prediction_list.append('tension') self.predict_variances = len(self.variance_prediction_list) > 0 self.lambda_var_loss = hparams['lambda_var_loss'] super()._finish_init() @@ -147,6 +153,7 @@ def run_model(self, sample, infer=False): pitch = sample.get('pitch') # [B, T_s] energy = sample.get('energy') # [B, T_s] breathiness = sample.get('breathiness') # [B, T_s] + tension = sample.get('tension') # [B, T_s] pitch_retake = variance_retake = None if (self.predict_pitch or self.predict_variances) and not infer: @@ -168,7 +175,7 @@ def run_model(self, sample, infer=False): note_midi=note_midi, note_rest=note_rest, note_dur=note_dur, note_glide=note_glide, mel2note=mel2note, base_pitch=base_pitch, pitch=pitch, - energy=energy, breathiness=breathiness, + energy=energy, breathiness=breathiness, tension=tension, pitch_retake=pitch_retake, variance_retake=variance_retake, spk_id=spk_ids, infer=infer ) diff --git a/utils/binarizer_utils.py b/utils/binarizer_utils.py index 1b630087e..a89ef433a 100644 --- a/utils/binarizer_utils.py +++ b/utils/binarizer_utils.py @@ -1,8 +1,11 @@ +from typing import Union, Dict + import librosa import numpy as np import parselmouth import pyworld as pw import torch +import torch.nn.functional as F from utils.pitch_utils import interp_f0 @@ -20,29 +23,35 @@ def get_mel2ph_torch(lr, durs, length, timestep, device='cpu'): return mel2ph -def get_pitch_parselmouth(wav_data, length, hparams, speed=1, interp_uv=False): +def get_pitch_parselmouth( + waveform, samplerate, length, + *, hop_size, f0_min=65, f0_max=1100, + speed=1, interp_uv=False +): """ - :param wav_data: [T] + :param waveform: [T] + :param samplerate: sampling rate :param length: Expected number of frames - :param hparams: + :param hop_size: Frame width, in number of samples + :param f0_min: Minimum f0 in Hz + :param f0_max: Maximum f0 in Hz :param speed: Change the speed :param interp_uv: Interpolate unvoiced parts :return: f0, uv """ - hop_size = int(np.round(hparams['hop_size'] * speed)) - time_step = hop_size / hparams['audio_sample_rate'] - f0_min = hparams['f0_min'] - f0_max = hparams['f0_max'] - - l_pad = int(np.ceil(1.5 / f0_min * hparams['audio_sample_rate'])) - r_pad = hop_size * ((len(wav_data) - 1) // hop_size + 1) - len(wav_data) + l_pad + 1 - wav_data = np.pad(wav_data, (l_pad, r_pad)) + hop_size = int(np.round(hop_size * speed)) + time_step = hop_size / samplerate + + l_pad = int(np.ceil(1.5 / f0_min * samplerate)) + r_pad = hop_size * ((len(waveform) - 1) // hop_size + 1) - len(waveform) + l_pad + 1 + waveform = np.pad(waveform, (l_pad, r_pad)) # noinspection PyArgumentList - s = parselmouth.Sound(wav_data, sampling_frequency=hparams['audio_sample_rate']).to_pitch_ac( + s = parselmouth.Sound(waveform, sampling_frequency=samplerate).to_pitch_ac( time_step=time_step, voicing_threshold=0.6, - pitch_floor=f0_min, pitch_ceiling=f0_max) + pitch_floor=f0_min, pitch_ceiling=f0_max + ) assert np.abs(s.t1 - 1.5 / f0_min) < 0.001 f0 = s.selected_array['frequency'].astype(np.float32) if len(f0) < length: @@ -54,63 +63,271 @@ def get_pitch_parselmouth(wav_data, length, hparams, speed=1, interp_uv=False): return f0, uv -def get_energy_librosa(wav_data, length, hparams): - """ +class DecomposedWaveform: + def __init__( + self, waveform, samplerate, f0, # basic parameters + *, + hop_size=None, fft_size=None, win_size=None, base_harmonic_radius=3.5, # analysis parameters + device=None # computation parameters + ): + # the source components + self._waveform = waveform + self._samplerate = samplerate + self._f0 = f0 + # extraction parameters + self._hop_size = hop_size + self._fft_size = fft_size if fft_size is not None else win_size + self._win_size = win_size if win_size is not None else win_size + self._time_step = hop_size / samplerate + self._half_width = base_harmonic_radius + self._device = ('cuda' if torch.cuda.is_available() else 'cpu') if device is None else device + # intermediate variables + self._f0_world = None + self._sp = None + self._ap = None + # final components + self._harmonic_part: np.ndarray = None + self._aperiodic_part: np.ndarray = None + self._harmonics: Dict[int, np.ndarray] = {} + + @property + def samplerate(self): + return self._samplerate + + @property + def hop_size(self): + return self._hop_size + + @property + def fft_size(self): + return self._fft_size + + @property + def win_size(self): + return self._win_size + + def _world_extraction(self): + # Add a tiny noise to the signal to avoid NaN results of D4C in rare edge cases + # References: + # - https://github.com/JeremyCCHsu/Python-Wrapper-for-World-Vocoder/issues/50 + # - https://github.com/mmorise/World/issues/116 + x = self._waveform.astype(np.double) + np.random.randn(*self._waveform.shape) * 1e-5 + samplerate = self._samplerate + f0 = self._f0.astype(np.double) + + hop_size = self._hop_size + fft_size = self._fft_size + + wav_frames = (x.shape[0] + hop_size - 1) // hop_size + f0_frames = f0.shape[0] + if f0_frames < wav_frames: + f0 = np.pad(f0, (0, wav_frames - f0_frames), mode='constant', constant_values=(f0[0], f0[-1])) + elif f0_frames > wav_frames: + f0 = f0[:wav_frames] + + time_step = hop_size / samplerate + t = np.arange(0, wav_frames) * time_step + self._f0_world = f0 + self._sp = pw.cheaptrick(x, f0, t, samplerate, fft_size=fft_size) # extract smoothed spectrogram + self._ap = pw.d4c(x, f0, t, samplerate, fft_size=fft_size) # extract aperiodicity + + def _kth_harmonic(self, k: int) -> np.ndarray: + """ + Extract the Kth harmonic (starting from 0) from the waveform. Author: @yxlllc + :param k: a non-negative integer + :return: kth_harmonic float32[T] + """ + if k in self._harmonics: + return self._harmonics[k] + + hop_size = self._hop_size + win_size = self._win_size + samplerate = self._samplerate + half_width = self._half_width + device = self._device + + waveform = torch.from_numpy(self.harmonic()).unsqueeze(0).to(device) # [B, n_samples] + n_samples = waveform.shape[1] + pad_size = (int(n_samples // hop_size) - len(self._f0) + 1) // 2 + f0 = self._f0[pad_size:] * (k + 1) + f0, _ = interp_f0(f0, uv=f0 == 0) + f0 = torch.from_numpy(f0).to(device)[None, :, None] # [B, n_frames, 1] + n_f0_frames = f0.shape[1] + + phase = torch.arange(win_size, dtype=waveform.dtype, device=device) / win_size * 2 * np.pi + nuttall_window = ( + 0.355768 + - 0.487396 * torch.cos(phase) + + 0.144232 * torch.cos(2 * phase) + - 0.012604 * torch.cos(3 * phase) + ) + spec = torch.stft( + waveform, + n_fft=win_size, + win_length=win_size, + hop_length=hop_size, + window=nuttall_window, + center=True, + return_complex=True + ).permute(0, 2, 1) # [B, n_frames, n_spec] + n_spec_frames, n_specs = spec.shape[1:] + idx = torch.arange(n_specs).unsqueeze(0).unsqueeze(0).to(f0) # [1, 1, n_spec] + center = f0 * win_size / samplerate + start = torch.clip(center - half_width, min=0) + end = torch.clip(center + half_width, max=n_specs) + idx_mask = (center >= 1) & (idx >= start) & (idx < end) # [B, n_frames, n_spec] + if n_f0_frames < n_spec_frames: + idx_mask = F.pad(idx_mask, [0, 0, 0, n_spec_frames - n_f0_frames]) + spec = spec * idx_mask[:, :n_spec_frames, :] + self._harmonics[k] = torch.istft( + spec.permute(0, 2, 1), + n_fft=win_size, + win_length=win_size, + hop_length=hop_size, + window=nuttall_window, + center=True, + length=n_samples + ).squeeze(0).cpu().numpy() - :param wav_data: [T] + return self._harmonics[k] + + def harmonic(self, k: int = None) -> np.ndarray: + """ + Extract the full harmonic part, or the Kth harmonic if `k` is not None, from the waveform. + :param k: an integer representing the harmonic index, starting from 0 + :return: full_harmonics float32[T] or kth_harmonic float32[T] + """ + if k is not None: + return self._kth_harmonic(k) + if self._harmonic_part is not None: + return self._harmonic_part + if self._sp is None or self._ap is None: + self._world_extraction() + self._harmonic_part = pw.synthesize( + self._f0_world, + np.clip(self._sp * (1 - self._ap * self._ap), a_min=1e-16, a_max=None), # clip to avoid zeros + np.zeros_like(self._ap), + self._samplerate, frame_period=self._time_step * 1000 + ).astype(np.float32) # synthesize the harmonic part using the parameters + return self._harmonic_part + + def aperiodic(self) -> np.ndarray: + """ + Extract the aperiodic part from the waveform. + :return: aperiodic_part float32[T] + """ + if self._aperiodic_part is not None: + return self._aperiodic_part + if self._sp is None or self._ap is None: + self._world_extraction() + self._aperiodic_part = pw.synthesize( + self._f0_world, self._sp * self._ap * self._ap, np.ones_like(self._ap), + self._samplerate, frame_period=self._time_step * 1000 + ).astype(np.float32) # synthesize the aperiodic part using the parameters + return self._aperiodic_part + + +def get_energy_librosa(waveform, length, *, hop_size, win_size, domain='db'): + """ + Definition of energy: RMS of the waveform, in dB representation + :param waveform: [T] :param length: Expected number of frames - :param hparams: + :param hop_size: Frame width, in number of samples + :param win_size: Window size, in number of samples + :param domain: db or amplitude :return: energy """ - hop_size = hparams['hop_size'] - win_size = hparams['win_size'] - - energy = librosa.feature.rms(y=wav_data, frame_length=win_size, hop_length=hop_size)[0] + energy = librosa.feature.rms(y=waveform, frame_length=win_size, hop_length=hop_size)[0] if len(energy) < length: energy = np.pad(energy, (0, length - len(energy))) energy = energy[: length] - energy_db = librosa.amplitude_to_db(energy) - return energy_db + if domain == 'db': + energy = librosa.amplitude_to_db(energy) + elif domain == 'amplitude': + pass + else: + raise ValueError(f'Invalid domain: {domain}') + return energy -def get_breathiness_pyworld(wav_data, f0, length, hparams): +def get_breathiness_pyworld( + waveform: Union[np.ndarray, DecomposedWaveform], + samplerate, f0, length, + *, hop_size=None, fft_size=None, win_size=None +): """ - - :param wav_data: [T] + Definition of breathiness: RMS of the aperiodic part, in dB representation + :param waveform: All other analysis parameters will not take effect if a DeconstructedWaveform is given + :param samplerate: sampling rate :param f0: reference f0 :param length: Expected number of frames - :param hparams: + :param hop_size: Frame width, in number of samples + :param fft_size: Number of fft bins + :param win_size: Window size, in number of samples :return: breathiness """ - sample_rate = hparams['audio_sample_rate'] - hop_size = hparams['hop_size'] - fft_size = hparams['fft_size'] - - # Add a tiny noise to the signal to avoid NaN results of D4C in rare edge cases - # References: - # - https://github.com/JeremyCCHsu/Python-Wrapper-for-World-Vocoder/issues/50 - # - https://github.com/mmorise/World/issues/116 - x = wav_data.astype(np.double) + np.random.randn(*wav_data.shape) * 1e-5 - f0 = f0.astype(np.double) - wav_frames = (x.shape[0] + hop_size - 1) // hop_size - f0_frames = f0.shape[0] - if f0_frames < wav_frames: - f0 = np.pad(f0, (0, wav_frames - f0_frames), mode='constant', constant_values=(f0[0], f0[-1])) - elif f0_frames > wav_frames: - f0 = f0[:wav_frames] - - time_step = hop_size / sample_rate - t = np.arange(0, wav_frames) * time_step - sp = pw.cheaptrick(x, f0, t, sample_rate, fft_size=fft_size) # extract smoothed spectrogram - ap = pw.d4c(x, f0, t, sample_rate, fft_size=fft_size) # extract aperiodicity - y = pw.synthesize( - f0, np.clip(sp * ap * ap, a_min=1e-16, a_max=None), np.ones_like(ap), sample_rate, - frame_period=time_step * 1000 - ).astype(np.float32) # synthesize the aperiodic part using the parameters - breathiness = get_energy_librosa(y, length, hparams) + if not isinstance(waveform, DecomposedWaveform): + waveform = DecomposedWaveform( + waveform=waveform, samplerate=samplerate, f0=f0, + hop_size=hop_size, fft_size=fft_size, win_size=win_size + ) + waveform_ap = waveform.aperiodic() + breathiness = get_energy_librosa( + waveform_ap, length=length, + hop_size=waveform.hop_size, win_size=waveform.win_size + ) return breathiness +def get_tension_base_harmonic( + waveform: Union[np.ndarray, DecomposedWaveform], + samplerate, f0, length, + *, hop_size=None, fft_size=None, win_size=None, + domain='logit' +): + """ + Definition of tension: radio of the real harmonic part (harmonic part except the base harmonic) + to the full harmonic part. + :param waveform: All other analysis parameters will not take effect if a DeconstructedWaveform is given + :param samplerate: sampling rate + :param f0: reference f0 + :param length: Expected number of frames + :param hop_size: Frame width, in number of samples + :param fft_size: Number of fft bins + :param win_size: Window size, in number of samples + :param domain: The domain of the final ratio representation. + Can be 'ratio' (the raw ratio), 'db' (log decibel) or 'logit' (the reverse function of sigmoid) + :return: tension + """ + if not isinstance(waveform, DecomposedWaveform): + waveform = DecomposedWaveform( + waveform=waveform, samplerate=samplerate, f0=f0, + hop_size=hop_size, fft_size=fft_size, win_size=win_size + ) + waveform_h = waveform.harmonic() + waveform_base_h = waveform.harmonic(0) + energy_base_h = get_energy_librosa( + waveform_base_h, length, + hop_size=waveform.hop_size, win_size=waveform.win_size, + domain='amplitude' + ) + energy_h = get_energy_librosa( + waveform_h, length, + hop_size=waveform.hop_size, win_size=waveform.win_size, + domain='amplitude' + ) + tension = np.sqrt(np.clip(energy_h ** 2 - energy_base_h ** 2, a_min=0, a_max=None)) / (energy_h + 1e-5) + if domain == 'ratio': + tension = np.clip(tension, a_min=0, a_max=1) + elif domain == 'db': + tension = np.clip(tension, a_min=1e-5, a_max=1) + tension = librosa.amplitude_to_db(tension) + elif domain == 'logit': + tension = np.clip(tension, a_min=1e-4, a_max=1 - 1e-4) + tension = np.log(tension / (1 - tension)) + return tension + + class SinusoidalSmoothingConv1d(torch.nn.Conv1d): def __init__(self, kernel_size): super().__init__(