#!/usr/bin/env python """ Audio file converter. """ import argparse import multiprocessing as mp import os import re import subprocess as sp import sys import mutagen from mutagen.mp3 import MP3 from mutagen.easyid3 import EasyID3, EasyID3KeyError VERSION = '1.0' def match_file(path): """Get all wav files from provided location""" matched_files = [] re_ = re.compile(r"^.*_\d+.wav$") for filename in os.listdir(os.path.abspath(path)): if re_.match(filename): matched_files.append(os.path.join(path, filename)) return sorted(matched_files) def get_filepaths_recursively(pattern=None): """Gather and return files""" if pattern and len(pattern) == 1 and pattern[0].startswith("*"): pat = "." + pattern[0] else: print("Recursive option only works with simple pattern for files" " match") sys.exit(1) files_ = [] for root, _, files in os.walk("."): for filename in files: if re.match(pat, filename): files_.append(os.path.join(root, filename)) return files_ def encode(obj): """Encode files with specified command""" obj.encode() class CueTrack(object): def __init__(self): """Init""" self.performer = None self.title = None class CueObjectParser(object): def __init__(self, cuefile): """Init""" self.cuefile = cuefile self.album_artist = None self.album = None self.tracks = [] self._parse_cue() def get_track_data(self, index): return self.tracks[index].title, self.tracks[index].performer def _parse_cue(self): """Read and parse cuefile""" with open(self.cuefile) as fobj: content = fobj.read() in_track = False track = None for line in content.split('\n'): if not in_track: if line.strip().upper().startswith('REM'): continue if line.strip().upper().startswith('PERFORMER'): self.album_artist = line.split('"')[1] continue if line.strip().upper().startswith('TITLE'): self.album = line.split('"')[1] continue if 'TRACK' in line.strip().upper(): in_track = True track = CueTrack() self.tracks.append(track) continue if in_track: if line.strip().upper().startswith('PERFORMER'): track.performer = line.split('"')[1] continue if line.strip().upper().startswith('TITLE'): track.title = line.split('"')[1] continue class Encoder(object): """Encoder base class""" EXT = ".undefined" def __init__(self, quality=None): self.quality = quality self.ext = self.EXT def encode(self, input_fname, output_fname): """Encode file""" raise NotImplementedError() class OggEncoder(Encoder): """Vorbis encoder""" EXT = ".ogg" def __init__(self, quality=None): """Init""" super(OggEncoder, self).__init__(quality) if self.quality is None: self.quality = 8 def encode(self, input_fname, output_fname): sp.check_call(["oggenc", "-q%s" % self.quality, input_fname, '-o', output_fname]) class Mp3Encoder(Encoder): """Mp3 encoder""" EXT = ".mp3" def __init__(self, quality=None): """Init""" super(Mp3Encoder, self).__init__(quality) if self.quality is None: self.quality = 6 def encode(self, input_fname, output_fname): sp.check_call(["lame", "-V%s" % self.quality, input_fname, output_fname]) class FileType(object): """Base class for file objects""" extensions = {'ogg': '.ogg', 'mp3': '.mp3'} def __init__(self, filename, encoder): self.filename = filename self.encoder = encoder self.out_fname = None self.base, self.ext = os.path.splitext(filename) self.ext = self.ext.lower() self.wav = self.base + ".wav" self.tmp_wav_remove = True self.album = None self.album_artist = None self.title = None self.performer = None def extract_wav(self): """Dummy function for wav files""" raise NotImplementedError() def encode(self): """Encode and tag file""" self.out_fname = self._get_output_fn() self.extract_wav() self.encoder.encode(self.wav, self.out_fname) self._tag_file() self.cleanup() def cleanup(self): """Remove intermediate wav file""" if not self.tmp_wav_remove: return os.unlink(self.wav) def _get_output_fn(self): """Get unique name for the output file""" out = self.base while os.path.exists(out + self.encoder.ext): out = out + "_encoded_" return out + self.encoder.ext def _tag_file(self): """Transfer tags from source files, or from cue""" if self.ext == '.mp3': tag = MP3(self.filename, ID3=EasyID3) else: tag = mutagen.File(self.filename) if isinstance(self.encoder, Mp3Encoder): self._mp3_tag(tag) return out_tag = mutagen.File(self.out_fname) if self.album: out_tag['album'] = self.album if self.album_artist: out_tag['albumartist'] = self.album_artist if self.performer: out_tag['artist'] = self.performer if self.title: out_tag['title'] = self.title out_tag.save() try: out_tag.update(tag) out_tag.save() except: pass def _mp3_tag(self, tag): """Special case of tagging mp3 output file""" mp3_tag = MP3(self.out_fname, ID3=EasyID3) mp3_tag.add_tags(ID3=EasyID3) if self.album: mp3_tag['album'] = self.album if self.performer: mp3_tag['artist'] = self.performer if self.title: mp3_tag['title'] = self.title mp3_tag.save() for key, val in tag.items(): try: if isinstance(val, list): mp3_tag[key] = ", ".join(val) else: mp3_tag[key] = val except EasyID3KeyError: pass # ignore unknown keys mp3_tag.save() class FlacType(FileType): """Flac filetype""" def extract_wav(self): """Call flac to extract flac file to wav""" sp.check_call(["flac", "-d", self.filename]) class ApeType(FileType): """Ape filetype""" def extract_wav(self): """Extract ape file to wav""" sp.check_call(["mac", self.filename, self.wav, "-d"]) class WvType(FileType): """Wv filetype""" def extract_wav(self): """Extract wavepack file to wav""" sp.check_call(["wvunpack", self.filename, "-o", self.wav]) class M4aType(FileType): """M4a filetype""" def __init__(self, filename, encoder): super(M4aType, self).__init__(filename, encoder) def extract_wav(self): """Extract m4a file to wav""" wav = self.wav if "," in wav: wav = wav.replace(",", "\\,") sp.check_call(["mplayer", "-vo", "none", self.filename, "-ao", "pcm:file=%s" % wav]) class WavType(FileType): """Uncompressed wav filetype""" def __init__(self, filename, encoder): super(WavType, self).__init__(filename, encoder) self.wav = filename self.tmp_wav_remove = False def extract_wav(self): """Do nothing, we already unpacked here""" return class Mp3Type(FileType): """Mp3 filetype""" def extract_wav(self): """Extract mp3 file to wav""" sp.check_call(["lame", "--decode", self.filename, self.wav]) class OggType(FileType): """Ogg Vorbis filetype""" def extract_wav(self): """Extract mp3 file to wav""" sp.check_call(["oggdec", self.filename]) class Converter(object): """Main class for converting files""" extract_map = {'.ape': ApeType, '.flac': FlacType, '.m4a': M4aType, '.mp3': Mp3Type, '.ogg': OggType, '.wav': WavType, '.wv': WvType} def __init__(self, split=False, files=tuple()): """Init""" self.files = files self._file_objs = [] self.split = split def run(self, encoder): """Do the conversion""" self._prepare_files(encoder) self._encode() def _prepare_files(self, encoder): """Determine file types, and create corresponding objects""" for filename in self.files: base, ext = os.path.splitext(filename) if self.split: self._split_file(filename, base, ext, encoder) else: klass = Converter.extract_map.get(ext.lower()) if not klass: continue obj = klass(filename, encoder) self._file_objs.append(obj) def _split_file(self, filename, base, ext, encoder): """Split file using cue file information""" wav = base + ".wav" cuefile = None for tmp in (base + ".cue", base + ".wav.cue", base + ".flac.cue", base + ".wv.cue", base + ".ape.cue"): if os.path.exists(tmp): cuefile = tmp print("*** cuefile: %s" % cuefile) break if cuefile is None: print("*** No cuefile found for `%s'" % filename) return cue = CueObjectParser(cuefile) # Extract file to the wav. Note, that object will not be added to the # list of wavs to encode klass = Converter.extract_map.get(ext.lower()) if not klass: print("*** Cannot find right converter for `%s'" % ext) return fobj = klass(filename, encoder) fobj.extract_wav() pipe = sp.Popen(["cuebreakpoints", cuefile], stdout=sp.PIPE) sp.check_call(["shnsplit", "-a", base + "_", "-o", "wav", wav], stdin=pipe.stdout, stdout=sp.PIPE) filepath = os.path.dirname(filename) for index, filename in enumerate(match_file(filepath)): obj = WavType(filename, encoder) obj.tmp_wav_remove = True obj.album = cue.album obj.album_artist = cue.album_artist obj.album = cue.album obj.title, obj.performer = cue.get_track_data(index) self._file_objs.append(obj) fobj.cleanup() def _encode(self): """Encode files""" pool = mp.Pool() # NOTE: map_async and get with timeout 999 are simply a hack for being # able to interrupt the process with ctrl+c pool.map_async(encode, tuple(self._file_objs)).get(999) ENCODERS = {'ogg': OggEncoder, 'mp3': Mp3Encoder} def main(): """Main""" arg = argparse.ArgumentParser(description='Convert between different ' 'audio file format.') arg.add_argument('-s', '--split', action='store_true', help='Split output file with provided *.cue file') arg.add_argument('-r', '--recursive', action='store_true', help='Do the files searching recursive') arg.add_argument('-e', '--encoder', default='ogg', type=str, choices=('ogg', 'mp3'), help='Encoder to use. Defaults ' 'to "ogg"') arg.add_argument('-q', '--quality', help='Quality of the encoded file. ' 'Consult "lame" and "oggenc" for details. Defaults are ' '6 for lame and 8 for oggenc.') arg.add_argument('-v', '--version', action='version', version='%(prog)s v' + VERSION, help='Display version and exit.') arg.add_argument('files', nargs='+', help='File(s) to encode') args = arg.parse_args() if args.recursive: files = get_filepaths_recursively(args.files) else: files = args.files conv = Converter(args.split, files) encoder = ENCODERS[args.encoder](args.quality) conv.run(encoder) if __name__ == "__main__": main()