diff --git a/flac2ogg.py b/flac2ogg.py index eea9dc5..cc0c44a 100755 --- a/flac2ogg.py +++ b/flac2ogg.py @@ -1,14 +1,51 @@ #!/usr/bin/env python +""" +Audio file converter. +This script automate conversion between different type of audio formats. +Conversion can be performed from the following formats: + + - FLAC + - MP3 + - MP4 + - WAVE + - WavePack + - Musepack + - Ogg Vorbis + +Currently supported encoders: + + - Ogg Vorbis + - MP3 (lame) + +There is an option to set the quality for the encoders - for Ogg files there +would be used an `-q' option for `oggenc` command, and for the mp3 format, +`-V` option would be used for `lame` command. Consult corresponding man pages +for details. + +Aim for this script is to produce high quality ogg files out of (preferably) +lossless formats like FLAC/WAVE/WavePack or small-sized mp3 files out of +anything. Of course there is no constraints on what source files would be and +what output format will be, so there is a possibility to create ogg form low +quality mp3 files, nevertheless it doesn't make any sense :) + +license: 3-clause BSD license +version: 1.0 +""" +import argparse +import multiprocessing as mp import os -import sys import re -from argparse import ArgumentParser -from subprocess import Popen, PIPE, check_call -from mutagen import File +import subprocess as sp +import sys + +import mutagen +from mutagen.mp3 import MP3 +from mutagen.easyid3 import EasyID3, EasyID3KeyError -def match_file(path, pattern): +def match_file(path): + """Get all wav files from provided location""" matched_files = [] re_ = re.compile(r"^.*_\d+.wav$") @@ -19,107 +56,418 @@ def match_file(path, pattern): return sorted(matched_files) -def extract_flac(filename, wav): - check_call(["flac", "-d", filename]) - - -def extract_ape(filename, wav): - check_call(["mac", filename, wav, "-d"]) - - -def extract_wv(filename, wav): - check_call(["wvunpack", filename, "-o", wav]) - - -def extract_wav(filename, wav): - """dummy extract method""" - return - - -def extract_m4a(filename, wav): - if "," in wav: - wav = wav.replace(",", "\\,") - - check_call(["mplayer", "-vo", "none", filename, "-ao", - "pcm:file=%s" % wav]) - - -def run(split, files=None): - for filename in files: - base, ext = os.path.splitext(filename) - wav = base + ".wav" - - extract_map = {'.flac': extract_flac, - '.ape': extract_ape, - '.wv': extract_wv, - '.wav': extract_wav, - '.m4a': extract_m4a} - - if ext.lower() not in extract_map: - continue - - extract_map[ext.lower()](filename, wav) - - if split: - cuefile = None - for tmp in (base + ".cue", base + ".wav.cue", - base + ".flac.cue", base + ".wv.cue", - base + ".ape.cue"): - if os.path.exists(tmp): - cuefile = tmp - print "*** cuefile: %s" % cuefile - break - - if cuefile is None: - print "*** no cuefile found" - continue - - pipe = Popen(["cuebreakpoints", cuefile], stdout=PIPE) - check_call(["shnsplit", "-a", base + "_", "-o", "wav", wav], - stdin=pipe.stdout, stdout=PIPE) - - for fn in match_file(os.path.dirname(filename), "_*wav"): - check_call(["oggenc", "-q8", fn]) - os.unlink(fn) - else: - tag = File(filename) - check_call(["oggenc", "-q8", wav]) - ogg_tag = File(base + ".ogg") - try: - ogg_tag.update(tag) - ogg_tag.save() - except: - pass - - os.unlink(wav) - - -def do_walk(split, files=None): - if files and len(files) == 1 and files[0].startswith("*"): - pat = "." + files[0] +def get_filepaths_recursively(pattern=None): + """Gather and return files""" + if pattern and len(pattern) == 1 and pattern[0].startswith("*"): + pat = "." + pattern[0] else: print "Recursive option only works with simple pattern for files match" sys.exit(1) - for root, dirs, files in os.walk("."): - files_ = [] - for f in files: - if re.match(pat, f): - files_.append(os.path.join(root, f)) + files_ = [] + for root, _, files in os.walk("."): + for filename in files: + if re.match(pat, filename): + files_.append(os.path.join(root, filename)) - if files_: - run(split, files_) + return files_ -if __name__ == "__main__": - arg = ArgumentParser(description='Merge MH mail directories') - arg.add_argument("-s", action='store_true', + +def encode(obj): + """Encode files with specified command""" + obj.encode() + + +class CueTrack(object): + def __init__(self): + """Init""" + self.performer = None + self.title = None + + +class CueObjectParser(object): + def __init__(self, cuefile): + """Init""" + self.cuefile = cuefile + self.album_artist = None + self.album = None + self.tracks = [] + self._parse_cue() + + def get_track_data(self, index): + return self.tracks[index].title, self.tracks[index].performer + + def _parse_cue(self): + """Read and parse cuefile""" + with open(self.cuefile) as fobj: + content = fobj.read() + + in_track = False + track = None + + for line in content.split('\n'): + if not in_track: + if line.strip().upper().startswith('REM'): + continue + + if line.strip().upper().startswith('PERFORMER'): + self.album_artist = line.split('"')[1] + continue + + if line.strip().upper().startswith('TITLE'): + self.album = line.split('"')[1] + continue + + if 'TRACK' in line.strip().upper(): + in_track = True + track = CueTrack() + self.tracks.append(track) + continue + + if in_track: + if line.strip().upper().startswith('PERFORMER'): + track.performer = line.split('"')[1] + continue + + if line.strip().upper().startswith('TITLE'): + track.title = line.split('"')[1] + continue + + +class Encoder(object): + """Encoder base class""" + EXT = ".undefined" + + def __init__(self, quality=None): + self.quality = quality + self.ext = self.EXT + + def encode(self, input_fname, output_fname): + """Encode file""" + raise NotImplementedError() + + +class OggEncoder(Encoder): + """Vorbis encoder""" + EXT = ".ogg" + + def __init__(self, quality=None): + """Init""" + super(OggEncoder, self).__init__(quality) + if self.quality is None: + self.quality = 8 + + def encode(self, input_fname, output_fname): + sp.check_call(["oggenc", "-q%s" % self.quality, input_fname, + '-o', output_fname]) + + +class Mp3Encoder(Encoder): + """Mp3 encoder""" + EXT = ".mp3" + + def __init__(self, input_fname, output_fname, quality=None): + """Init""" + super(Mp3Encoder, self).__init__(input_fname, output_fname, quality) + if self.quality is None: + self.quality = 6 + + def encode(self, input_fname, output_fname): + sp.check_call(["lame", "-V%s" % self.quality, input_fname, + output_fname]) + + +class FileType(object): + """Base class for file objects""" + extensions = {'ogg': '.ogg', + 'mp3': '.mp3'} + + def __init__(self, filename, encoder): + self.filename = filename + self.encoder = encoder + + self.out_fname = None + + self.base, self.ext = os.path.splitext(filename) + self.ext = self.ext.lower() + self.wav = self.base + ".wav" + + self.tmp_wav_remove = True + + self.album = None + self.album_artist = None + self.album = None + self.title = None + self.performer = None + + def extract_wav(self): + """Dummy function for wav files""" + raise NotImplementedError() + + def encode(self): + """Encode and tag file""" + self.out_fname = self._get_output_fn() + self.extract_wav() + self.encoder.encode(self.wav, self.out_fname) + self._tag_file() + self.cleanup() + + def cleanup(self): + """Remove intermediate wav file""" + if not self.tmp_wav_remove: + return + + os.unlink(self.wav) + + def _get_output_fn(self): + """Get unique name for the output file""" + out = self.base + + while os.path.exists(out + self.encoder.ext): + out = out + "_encoded_" + + return out + self.encoder.ext + + def _tag_file(self): + """Transfer tags from source files, or from cue""" + + if self.ext == '.mp3': + tag = MP3(self.filename, ID3=EasyID3) + else: + tag = mutagen.File(self.filename) + + + if self.encoder == 'mp3': + self._mp3_tag(tag) + return + + out_tag = mutagen.File(self.out_fname) + if self.album: + out_tag['album'] = self.album + if self.album_artist: + out_tag['albumartist'] = self.album_artist + if self.performer: + out_tag['artist'] = self.performer + if self.title: + out_tag['title'] = self.title + out_tag.save() + + try: + out_tag.update(tag) + out_tag.save() + except: + pass + + def _mp3_tag(self, tag): + """Special case of tagging mp3 output file""" + mp3_tag = MP3(self.out_fname, ID3=EasyID3) + mp3_tag.add_tags(ID3=EasyID3) + + if self.album: + mp3_tag['album'] = self.album + if self.performer: + mp3_tag['artist'] = self.performer + if self.title: + mp3_tag['title'] = self.title + mp3_tag.save() + + for key, val in tag.items(): + try: + if isinstance(val, list): + mp3_tag[key] = ", ".join(val) + else: + mp3_tag[key] = val + except EasyID3KeyError: + pass # ignore unknown keys + + mp3_tag.save() + + +class FlacType(FileType): + """Flac filetype""" + + def extract_wav(self): + """Call flac to extract flac file to wav""" + sp.check_call(["flac", "-d", self.filename]) + + +class ApeType(FileType): + """Ape filetype""" + + def extract_wav(self): + """Extract mpc file to wav""" + sp.check_call(["mac", self.filename, self.wav, "-d"]) + + +class WvType(FileType): + """Wv filetype""" + + def extract_wav(self): + """Extract wavepack file to wav""" + sp.check_call(["wvunpack", self.filename, "-o", self.wav]) + + +class M4aType(FileType): + """M4a filetype""" + + def __init__(self, filename, encoder): + super(M4aType, self).__init__(filename, encoder) + + def extract_wav(self): + """Extract m4a file to wav""" + wav = self.wav + if "," in wav: + wav = wav.replace(",", "\\,") + sp.check_call(["mplayer", "-vo", "none", self.filename, "-ao", + "pcm:file=%s" % wav]) + + +class WavType(FileType): + """Uncompressed wav filetype""" + + def __init__(self, filename, encoder): + super(WavType, self).__init__(filename, encoder) + self.wav = filename + self.tmp_wav_remove = False + + def extract_wav(self): + """Do nothing, we already unpacked here""" + return + + +class Mp3Type(FileType): + """Mp3 filetype""" + + def extract_wav(self): + """Extract mp3 file to wav""" + sp.check_call(["lame", "--decode", self.filename, self.wav]) + +class OggType(FileType): + """Ogg Vorbis filetype""" + + def extract_wav(self): + """Extract mp3 file to wav""" + sp.check_call(["oggdec", self.filename]) + + +class Converter(object): + """Main class for converting files""" + extract_map = {'.ape': ApeType, + '.flac': FlacType, + '.m4a': M4aType, + '.mp3': Mp3Type, + '.ogg': OggType, + '.wav': WavType, + '.wv': WvType} + + def __init__(self, split=False, files=tuple()): + """Init""" + self.files = files + self._file_objs = [] + self.split = split + + def run(self, encoder): + """Do the conversion""" + self._prepare_files(encoder) + self._encode() + + def _prepare_files(self, encoder): + """Determine file types, and create corresponding objects""" + + for filename in self.files: + base, ext = os.path.splitext(filename) + + if self.split: + self._split_file(filename, base, ext, encoder) + else: + klass = Converter.extract_map.get(ext.lower()) + if not klass: + continue + obj = klass(filename, encoder) + self._file_objs.append(obj) + + def _split_file(self, filename, base, ext, encoder): + """Split file using cue file information""" + wav = base + ".wav" + + cuefile = None + + for tmp in (base + ".cue", base + ".wav.cue", base + ".flac.cue", + base + ".wv.cue", base + ".ape.cue"): + if os.path.exists(tmp): + cuefile = tmp + print "*** cuefile: %s" % cuefile + break + + if cuefile is None: + print "*** No cuefile found for `%s'" % filename + return + + cue = CueObjectParser(cuefile) + + # Extract file to the wav. Note, that object will not be added to the + # list of wavs to encode + klass = Converter.extract_map.get(ext.lower()) + if not klass: + print "*** Cannot find right converter for `%s'" % ext + return + + fobj = klass(filename, encoder) + fobj.extract_wav() + + pipe = sp.Popen(["cuebreakpoints", cuefile], stdout=sp.PIPE) + sp.check_call(["shnsplit", "-a", base + "_", "-o", "wav", wav], + stdin=pipe.stdout, stdout=sp.PIPE) + + filepath = os.path.dirname(filename) + for index, filename in enumerate(match_file(filepath)): + obj = WavType(filename, encoder) + obj.tmp_wav_remove = True + obj.album = cue.album + obj.album_artist = cue.album_artist + obj.album = cue.album + obj.title, obj.performer = cue.get_track_data(index) + self._file_objs.append(obj) + + fobj.cleanup() + + def _encode(self): + """Encode files""" + pool = mp.Pool() + # NOTE: map_async and get with timeout 999 are simply a hack for being + # able to interrupt the process with ctrl+c + pool.map_async(encode, tuple(self._file_objs)).get(999) + + +ENCODERS = {'ogg': OggEncoder, + 'mp3': Mp3Encoder} + + +def main(): + """Main""" + arg = argparse.ArgumentParser(description='Convert between different ' + 'audio file format.') + arg.add_argument('-s', '--split', action='store_true', help='split output file with provided *.cue file') - arg.add_argument("-r", action='store_true', - help='do the files searchng recursive') - arg.add_argument("files", nargs="+", help="files to encode") + arg.add_argument('-r', '--recursive', action='store_true', + help='do the files searching recursive') + arg.add_argument('-e', '--encoder', default='ogg', type=str, + choices=('ogg', 'mp3'), help='encoder to use. Defaults ' + 'to "ogg"') + arg.add_argument('-q', '--quality', help='Quality of the encoded file. ' + 'Consult "lame" and "oggenc" for details. Defaults are ' + '6 for lame and 8 for oggenc.') + arg.add_argument('files', nargs='+', help='files to encode') args = arg.parse_args() - if args.r: - do_walk(args.s, args.files) + if args.recursive: + files = get_filepaths_recursively(args.files) else: - run(args.s, args.files) + files = args.files + + conv = Converter(args.split, files) + encoder = ENCODERS[args.encoder](args.quality) + conv.run(encoder) + +if __name__ == "__main__": + main()