diff --git a/ebook_converter/ebooks/metadata/pdf.py b/ebook_converter/ebooks/metadata/pdf.py new file mode 100644 index 0000000..4f7486d --- /dev/null +++ b/ebook_converter/ebooks/metadata/pdf.py @@ -0,0 +1,190 @@ +from __future__ import absolute_import, division, print_function, unicode_literals + +__license__ = 'GPL v3' +__copyright__ = '2008, Kovid Goyal ' +'''Read meta information from PDF files''' + +import os, subprocess, shutil, re +from functools import partial + +from calibre import prints +from calibre.constants import iswindows, ispy3 +from calibre.ptempfile import TemporaryDirectory +from calibre.ebooks.metadata import ( + MetaInformation, string_to_authors, check_isbn, check_doi) +from calibre.utils.ipc.simple_worker import fork_job, WorkerError +from polyglot.builtins import iteritems, unicode_type + + +def get_tools(): + from calibre.ebooks.pdf.pdftohtml import PDFTOHTML + base = os.path.dirname(PDFTOHTML) + suffix = '.exe' if iswindows else '' + pdfinfo = os.path.join(base, 'pdfinfo') + suffix + pdftoppm = os.path.join(base, 'pdftoppm') + suffix + return pdfinfo, pdftoppm + + +def read_info(outputdir, get_cover): + ''' Read info dict and cover from a pdf file named src.pdf in outputdir. + Note that this function changes the cwd to outputdir and is therefore not + thread safe. Run it using fork_job. This is necessary as there is no safe + way to pass unicode paths via command line arguments. This also ensures + that if poppler crashes, no stale file handles are left for the original + file, only for src.pdf.''' + os.chdir(outputdir) + pdfinfo, pdftoppm = get_tools() + ans = {} + + try: + raw = subprocess.check_output([pdfinfo, '-enc', 'UTF-8', '-isodates', 'src.pdf']) + except subprocess.CalledProcessError as e: + prints('pdfinfo errored out with return code: %d'%e.returncode) + return None + try: + info_raw = raw.decode('utf-8') + except UnicodeDecodeError: + prints('pdfinfo returned no UTF-8 data') + return None + + for line in info_raw.splitlines(): + if ':' not in line: + continue + field, val = line.partition(':')[::2] + val = val.strip() + if field and val: + ans[field] = val.strip() + + # Now read XMP metadata + # Versions of poppler before 0.47.0 used to print out both the Info dict and + # XMP metadata packet together. However, since that changed in + # https://cgit.freedesktop.org/poppler/poppler/commit/?id=c91483aceb1b640771f572cb3df9ad707e5cad0d + # we can no longer rely on it. + try: + raw = subprocess.check_output([pdfinfo, '-meta', 'src.pdf']).strip() + except subprocess.CalledProcessError as e: + prints('pdfinfo failed to read XML metadata with return code: %d'%e.returncode) + else: + parts = re.split(br'^Metadata:', raw, 1, flags=re.MULTILINE) + if len(parts) > 1: + # old poppler < 0.47.0 + raw = parts[1].strip() + if raw: + ans['xmp_metadata'] = raw + + if get_cover: + try: + subprocess.check_call([pdftoppm, '-singlefile', '-jpeg', '-cropbox', + 'src.pdf', 'cover']) + except subprocess.CalledProcessError as e: + prints('pdftoppm errored out with return code: %d'%e.returncode) + + return ans + + +def page_images(pdfpath, outputdir='.', first=1, last=1, image_format='jpeg', prefix='page-images'): + pdftoppm = get_tools()[1] + outputdir = os.path.abspath(outputdir) + args = {} + if iswindows: + import win32process as w + args['creationflags'] = w.HIGH_PRIORITY_CLASS | w.CREATE_NO_WINDOW + try: + subprocess.check_call([ + pdftoppm, '-cropbox', '-' + image_format, '-f', unicode_type(first), + '-l', unicode_type(last), pdfpath, os.path.join(outputdir, prefix) + ], **args) + except subprocess.CalledProcessError as e: + raise ValueError('Failed to render PDF, pdftoppm errorcode: %s'%e.returncode) + + +def is_pdf_encrypted(path_to_pdf): + if not ispy3 and not isinstance(path_to_pdf, bytes): + path_to_pdf = path_to_pdf.encode('mbcs' if iswindows else 'utf-8') + pdfinfo = get_tools()[0] + raw = subprocess.check_output([pdfinfo, path_to_pdf]) + q = re.search(br'^Encrypted:\s*(\S+)', raw, flags=re.MULTILINE) + if q is not None: + return q.group(1) == b'yes' + return False + + +def get_metadata(stream, cover=True): + with TemporaryDirectory('_pdf_metadata_read') as pdfpath: + stream.seek(0) + with open(os.path.join(pdfpath, 'src.pdf'), 'wb') as f: + shutil.copyfileobj(stream, f) + try: + res = fork_job('calibre.ebooks.metadata.pdf', 'read_info', + (pdfpath, bool(cover))) + except WorkerError as e: + prints(e.orig_tb) + raise RuntimeError('Failed to run pdfinfo') + info = res['result'] + with open(res['stdout_stderr'], 'rb') as f: + raw = f.read().strip() + if raw: + prints(raw) + if info is None: + raise ValueError('Could not read info dict from PDF') + covpath = os.path.join(pdfpath, 'cover.jpg') + cdata = None + if cover and os.path.exists(covpath): + with open(covpath, 'rb') as f: + cdata = f.read() + + title = info.get('Title', None) or _('Unknown') + au = info.get('Author', None) + if au is None: + au = [_('Unknown')] + else: + au = string_to_authors(au) + mi = MetaInformation(title, au) + # if isbn is not None: + # mi.isbn = isbn + + creator = info.get('Creator', None) + if creator: + mi.book_producer = creator + + keywords = info.get('Keywords', None) + mi.tags = [] + if keywords: + mi.tags = [x.strip() for x in keywords.split(',')] + isbn = [check_isbn(x) for x in mi.tags if check_isbn(x)] + if isbn: + mi.isbn = isbn = isbn[0] + mi.tags = [x for x in mi.tags if check_isbn(x) != isbn] + + subject = info.get('Subject', None) + if subject: + mi.tags.insert(0, subject) + + if 'xmp_metadata' in info: + from calibre.ebooks.metadata.xmp import consolidate_metadata + mi = consolidate_metadata(mi, info) + + # Look for recognizable identifiers in the info dict, if they were not + # found in the XMP metadata + for scheme, check_func in iteritems({'doi':check_doi, 'isbn':check_isbn}): + if scheme not in mi.get_identifiers(): + for k, v in iteritems(info): + if k != 'xmp_metadata': + val = check_func(v) + if val: + mi.set_identifier(scheme, val) + break + + if cdata: + mi.cover_data = ('jpeg', cdata) + return mi + + +get_quick_metadata = partial(get_metadata, cover=False) + +from calibre.utils.podofo import set_metadata as podofo_set_metadata + + +def set_metadata(stream, mi): + stream.seek(0) + return podofo_set_metadata(stream, mi) diff --git a/ebook_converter/ebooks/metadata/xmp.py b/ebook_converter/ebooks/metadata/xmp.py new file mode 100644 index 0000000..2cc1a5d --- /dev/null +++ b/ebook_converter/ebooks/metadata/xmp.py @@ -0,0 +1,647 @@ +#!/usr/bin/env python2 +# vim:fileencoding=utf-8 +from __future__ import absolute_import, division, print_function, unicode_literals + +__license__ = 'GPL v3' +__copyright__ = '2014, Kovid Goyal ' + +import re, sys, copy, json +from itertools import repeat +from collections import defaultdict + +from lxml import etree +from lxml.builder import ElementMaker + +from calibre import prints +from calibre.ebooks.metadata import check_isbn, check_doi +from calibre.utils.xml_parse import safe_xml_fromstring +from calibre.ebooks.metadata.book.base import Metadata +from calibre.ebooks.metadata.opf2 import dump_dict +from calibre.utils.date import parse_date, isoformat, now +from calibre.utils.localization import canonicalize_lang, lang_as_iso639_1 +from polyglot.builtins import iteritems, string_or_bytes, filter + +_xml_declaration = re.compile(r'<\?xml[^<>]+encoding\s*=\s*[\'"](.*?)[\'"][^<>]*>', re.IGNORECASE) + +NS_MAP = { + 'rdf': 'http://www.w3.org/1999/02/22-rdf-syntax-ns#', + 'dc': 'http://purl.org/dc/elements/1.1/', + 'pdf': 'http://ns.adobe.com/pdf/1.3/', + 'pdfx': 'http://ns.adobe.com/pdfx/1.3/', + 'xmp': 'http://ns.adobe.com/xap/1.0/', + 'xmpidq': 'http://ns.adobe.com/xmp/Identifier/qual/1.0/', + 'xmpMM': 'http://ns.adobe.com/xap/1.0/mm/', + 'xmpRights': 'http://ns.adobe.com/xap/1.0/rights/', + 'xmpBJ': 'http://ns.adobe.com/xap/1.0/bj/', + 'xmpTPg': 'http://ns.adobe.com/xap/1.0/t/pg/', + 'xmpDM': 'http://ns.adobe.com/xmp/1.0/DynamicMedia/', + 'prism': 'http://prismstandard.org/namespaces/basic/2.0/', + 'crossmark': 'http://crossref.org/crossmark/1.0/', + 'xml': 'http://www.w3.org/XML/1998/namespace', + 'x': 'adobe:ns:meta/', + 'calibre': 'http://calibre-ebook.com/xmp-namespace', + 'calibreSI': 'http://calibre-ebook.com/xmp-namespace-series-index', + 'calibreCC': 'http://calibre-ebook.com/xmp-namespace-custom-columns', +} +KNOWN_ID_SCHEMES = {'isbn', 'url', 'doi'} + + +def expand(name): + prefix, name = name.partition(':')[::2] + return '{%s}%s' % (NS_MAP[prefix], name) + + +xpath_cache = {} + + +def XPath(expr): + ans = xpath_cache.get(expr, None) + if ans is None: + xpath_cache[expr] = ans = etree.XPath(expr, namespaces=NS_MAP) + return ans + + +def parse_xmp_packet(raw_bytes): + raw_bytes = raw_bytes.strip() + enc = None + pat = r''']*?begin\s*=\s*['"]([^'"]*)['"]''' + encodings = ('8', '16-le', '16-be', '32-le', '32-be') + header = raw_bytes[:1024] + emap = {'\ufeff'.encode('utf-'+x):'utf-'+x for x in encodings} + emap[b''] = 'utf-8' + for q in encodings: + m = re.search(pat.encode('utf-'+q), header) + if m is not None: + enc = emap.get(m.group(1), enc) + break + if enc is None: + return safe_xml_fromstring(raw_bytes) + raw = _xml_declaration.sub('', raw_bytes.decode(enc)) # lxml barfs if encoding declaration present in unicode string + return safe_xml_fromstring(raw) + + +def serialize_xmp_packet(root, encoding='utf-8'): + root.tail = '\n' + '\n'.join(repeat(' '*100, 30)) # Adobe spec recommends inserting padding at the end of the packet + raw_bytes = etree.tostring(root, encoding=encoding, pretty_print=True, with_tail=True, method='xml') + return b'\n%s\n' % ('\ufeff'.encode(encoding), raw_bytes) + + +def read_simple_property(elem): + # A simple property + if elem is not None: + if elem.text: + return elem.text + return elem.get(expand('rdf:resource'), '') + + +def read_lang_alt(parent): + # A text value with possible alternate values in different languages + items = XPath('descendant::rdf:li[@xml:lang="x-default"]')(parent) + if items: + return items[0] + items = XPath('descendant::rdf:li')(parent) + if items: + return items[0] + + +def read_sequence(parent): + # A sequence or set of values (assumes simple properties in the sequence) + for item in XPath('descendant::rdf:li')(parent): + yield read_simple_property(item) + + +def uniq(vals, kmap=lambda x:x): + ''' Remove all duplicates from vals, while preserving order. kmap must be a + callable that returns a hashable value for every item in vals ''' + vals = vals or () + lvals = (kmap(x) for x in vals) + seen = set() + seen_add = seen.add + return tuple(x for x, k in zip(vals, lvals) if k not in seen and not seen_add(k)) + + +def multiple_sequences(expr, root): + # Get all values for sequence elements matching expr, ensuring the returned + # list contains distinct non-null elements preserving their order. + ans = [] + for item in XPath(expr)(root): + ans += list(read_sequence(item)) + return list(filter(None, uniq(ans))) + + +def first_alt(expr, root): + # The first element matching expr, assumes that the element contains a + # language alternate array + for item in XPath(expr)(root): + q = read_simple_property(read_lang_alt(item)) + if q: + return q + + +def first_simple(expr, root): + # The value for the first occurrence of an element matching expr (assumes + # simple property) + for item in XPath(expr)(root): + q = read_simple_property(item) + if q: + return q + + +def first_sequence(expr, root): + # The first item in a sequence + for item in XPath(expr)(root): + for ans in read_sequence(item): + return ans + + +def read_series(root): + for item in XPath('//calibre:series')(root): + val = XPath('descendant::rdf:value')(item) + if val: + series = val[0].text + if series and series.strip(): + series_index = 1.0 + for si in XPath('descendant::calibreSI:series_index')(item): + try: + series_index = float(si.text) + except (TypeError, ValueError): + continue + else: + break + return series, series_index + return None, None + + +def read_user_metadata(mi, root): + from calibre.utils.config import from_json + from calibre.ebooks.metadata.book.json_codec import decode_is_multiple + fields = set() + for item in XPath('//calibre:custom_metadata')(root): + for li in XPath('./rdf:Bag/rdf:li')(item): + name = XPath('descendant::calibreCC:name')(li) + if name: + name = name[0].text + if name.startswith('#') and name not in fields: + val = XPath('descendant::rdf:value')(li) + if val: + fm = val[0].text + try: + fm = json.loads(fm, object_hook=from_json) + decode_is_multiple(fm) + mi.set_user_metadata(name, fm) + fields.add(name) + except: + prints('Failed to read user metadata:', name) + import traceback + traceback.print_exc() + + +def read_xmp_identifers(parent): + ''' For example: + URLhttp://foo.com + or the longer form: + URLhttp://foo.com + ''' + for li in XPath('./rdf:Bag/rdf:li')(parent): + is_resource = li.attrib.get(expand('rdf:parseType'), None) == 'Resource' + is_resource = is_resource or (len(li) == 1 and li[0].tag == expand('rdf:Description')) + if not is_resource: + yield None, li.text or '' + value = XPath('descendant::rdf:value')(li) + if not value: + continue + value = value[0].text or '' + scheme = XPath('descendant::xmpidq:Scheme')(li) + if not scheme: + yield None, value + else: + yield scheme[0].text or '', value + + +def safe_parse_date(raw): + if raw: + try: + return parse_date(raw) + except Exception: + pass + + +def more_recent(one, two): + if one is None: + return two + if two is None: + return one + try: + return max(one, two) + except Exception: + return one + + +def metadata_from_xmp_packet(raw_bytes): + root = parse_xmp_packet(raw_bytes) + mi = Metadata(_('Unknown')) + title = first_alt('//dc:title', root) + if title: + if title.startswith(r'\376\377'): + # corrupted XMP packet generated by Nitro PDF. See + # https://bugs.launchpad.net/calibre/+bug/1541981 + raise ValueError('Corrupted XMP metadata packet detected, probably generated by Nitro PDF') + mi.title = title + authors = multiple_sequences('//dc:creator', root) + if authors: + mi.authors = authors + tags = multiple_sequences('//dc:subject', root) or multiple_sequences('//pdf:Keywords', root) + if tags: + mi.tags = tags + comments = first_alt('//dc:description', root) + if comments: + mi.comments = comments + publishers = multiple_sequences('//dc:publisher', root) + if publishers: + mi.publisher = publishers[0] + try: + pubdate = parse_date(first_sequence('//dc:date', root) or first_simple('//xmp:CreateDate', root), assume_utc=False) + except: + pass + else: + mi.pubdate = pubdate + bkp = first_simple('//xmp:CreatorTool', root) + if bkp: + mi.book_producer = bkp + md = safe_parse_date(first_simple('//xmp:MetadataDate', root)) + mod = safe_parse_date(first_simple('//xmp:ModifyDate', root)) + fd = more_recent(md, mod) + if fd is not None: + mi.metadata_date = fd + rating = first_simple('//calibre:rating', root) + if rating is not None: + try: + rating = float(rating) + if 0 <= rating <= 10: + mi.rating = rating + except (ValueError, TypeError): + pass + series, series_index = read_series(root) + if series: + mi.series, mi.series_index = series, series_index + for x in ('title_sort', 'author_sort'): + for elem in XPath('//calibre:' + x)(root): + val = read_simple_property(elem) + if val: + setattr(mi, x, val) + break + for x in ('author_link_map', 'user_categories'): + val = first_simple('//calibre:'+x, root) + if val: + try: + setattr(mi, x, json.loads(val)) + except: + pass + + languages = multiple_sequences('//dc:language', root) + if languages: + languages = list(filter(None, map(canonicalize_lang, languages))) + if languages: + mi.languages = languages + + identifiers = {} + for xmpid in XPath('//xmp:Identifier')(root): + for scheme, value in read_xmp_identifers(xmpid): + if scheme and value: + identifiers[scheme.lower()] = value + + for namespace in ('prism', 'pdfx'): + for scheme in KNOWN_ID_SCHEMES: + if scheme not in identifiers: + val = first_simple('//%s:%s' % (namespace, scheme), root) + scheme = scheme.lower() + if scheme == 'isbn': + val = check_isbn(val) + elif scheme == 'doi': + val = check_doi(val) + if val: + identifiers[scheme] = val + + # Check Dublin Core for recognizable identifier types + for scheme, check_func in iteritems({'doi':check_doi, 'isbn':check_isbn}): + if scheme not in identifiers: + val = check_func(first_simple('//dc:identifier', root)) + if val: + identifiers['doi'] = val + + if identifiers: + mi.set_identifiers(identifiers) + + read_user_metadata(mi, root) + + return mi + + +def consolidate_metadata(info_mi, info): + ''' When both the PDF Info dict and XMP metadata are present, prefer the xmp + metadata unless the Info ModDate is never than the XMP MetadataDate. This + is the algorithm recommended by the PDF spec. ''' + try: + raw = info['xmp_metadata'].rstrip() + if not raw: + return info_mi + xmp_mi = metadata_from_xmp_packet(raw) + except Exception: + import traceback + traceback.print_exc() + return info_mi + info_title, info_authors, info_tags = info_mi.title or _('Unknown'), list(info_mi.authors or ()), list(info_mi.tags or ()) + info_mi.smart_update(xmp_mi, replace_metadata=True) + prefer_info = False + if 'ModDate' in info and hasattr(xmp_mi, 'metadata_date'): + try: + info_date = parse_date(info['ModDate']) + except Exception: + pass + else: + prefer_info = info_date > xmp_mi.metadata_date + if prefer_info: + info_mi.title, info_mi.authors, info_mi.tags = info_title, info_authors, info_tags + else: + # We'll use the xmp tags/authors but fallback to the info ones if the + # xmp does not have tags/authors. smart_update() should have taken care of + # the rest + info_mi.authors, info_mi.tags = (info_authors if xmp_mi.is_null('authors') else xmp_mi.authors), xmp_mi.tags or info_tags + return info_mi + + +def nsmap(*args): + return {x:NS_MAP[x] for x in args} + + +def create_simple_property(parent, tag, value): + e = parent.makeelement(expand(tag)) + parent.append(e) + e.text = value + + +def create_alt_property(parent, tag, value): + e = parent.makeelement(expand(tag)) + parent.append(e) + alt = e.makeelement(expand('rdf:Alt')) + e.append(alt) + li = alt.makeelement(expand('rdf:li')) + alt.append(li) + li.set(expand('xml:lang'), 'x-default') + li.text = value + + +def create_sequence_property(parent, tag, val, ordered=True): + e = parent.makeelement(expand(tag)) + parent.append(e) + seq = e.makeelement(expand('rdf:' + ('Seq' if ordered else 'Bag'))) + e.append(seq) + for x in val: + li = seq.makeelement(expand('rdf:li')) + li.text = x + seq.append(li) + + +def create_identifiers(xmp, identifiers): + xmpid = xmp.makeelement(expand('xmp:Identifier')) + xmp.append(xmpid) + bag = xmpid.makeelement(expand('rdf:Bag')) + xmpid.append(bag) + for scheme, value in iteritems(identifiers): + li = bag.makeelement(expand('rdf:li')) + li.set(expand('rdf:parseType'), 'Resource') + bag.append(li) + s = li.makeelement(expand('xmpidq:Scheme')) + s.text = scheme + li.append(s) + val = li.makeelement(expand('rdf:value')) + li.append(val) + val.text = value + + +def create_series(calibre, series, series_index): + s = calibre.makeelement(expand('calibre:series')) + s.set(expand('rdf:parseType'), 'Resource') + calibre.append(s) + val = s.makeelement(expand('rdf:value')) + s.append(val) + val.text = series + try: + series_index = float(series_index) + except (TypeError, ValueError): + series_index = 1.0 + si = s.makeelement(expand('calibreSI:series_index')) + si.text = '%.2f' % series_index + s.append(si) + + +def create_user_metadata(calibre, all_user_metadata): + from calibre.utils.config import to_json + from calibre.ebooks.metadata.book.json_codec import object_to_unicode, encode_is_multiple + + s = calibre.makeelement(expand('calibre:custom_metadata')) + calibre.append(s) + bag = s.makeelement(expand('rdf:Bag')) + s.append(bag) + for name, fm in iteritems(all_user_metadata): + try: + fm = copy.copy(fm) + encode_is_multiple(fm) + fm = object_to_unicode(fm) + fm = json.dumps(fm, default=to_json, ensure_ascii=False) + except: + prints('Failed to write user metadata:', name) + import traceback + traceback.print_exc() + continue + li = bag.makeelement(expand('rdf:li')) + li.set(expand('rdf:parseType'), 'Resource') + bag.append(li) + n = li.makeelement(expand('calibreCC:name')) + li.append(n) + n.text = name + val = li.makeelement(expand('rdf:value')) + val.text = fm + li.append(val) + + +def metadata_to_xmp_packet(mi): + A = ElementMaker(namespace=NS_MAP['x'], nsmap=nsmap('x')) + R = ElementMaker(namespace=NS_MAP['rdf'], nsmap=nsmap('rdf')) + root = A.xmpmeta(R.RDF) + rdf = root[0] + dc = rdf.makeelement(expand('rdf:Description'), nsmap=nsmap('dc')) + dc.set(expand('rdf:about'), '') + rdf.append(dc) + for prop, tag in iteritems({'title':'dc:title', 'comments':'dc:description'}): + val = mi.get(prop) or '' + create_alt_property(dc, tag, val) + for prop, (tag, ordered) in iteritems({ + 'authors':('dc:creator', True), 'tags':('dc:subject', False), 'publisher':('dc:publisher', False), + }): + val = mi.get(prop) or () + if isinstance(val, string_or_bytes): + val = [val] + create_sequence_property(dc, tag, val, ordered) + if not mi.is_null('pubdate'): + create_sequence_property(dc, 'dc:date', [isoformat(mi.pubdate, as_utc=False)]) # Adobe spec recommends local time + if not mi.is_null('languages'): + langs = list(filter(None, map(lambda x:lang_as_iso639_1(x) or canonicalize_lang(x), mi.languages))) + if langs: + create_sequence_property(dc, 'dc:language', langs, ordered=False) + + xmp = rdf.makeelement(expand('rdf:Description'), nsmap=nsmap('xmp', 'xmpidq')) + xmp.set(expand('rdf:about'), '') + rdf.append(xmp) + extra_ids = {} + for x in ('prism', 'pdfx'): + p = extra_ids[x] = rdf.makeelement(expand('rdf:Description'), nsmap=nsmap(x)) + p.set(expand('rdf:about'), '') + rdf.append(p) + + identifiers = mi.get_identifiers() + if identifiers: + create_identifiers(xmp, identifiers) + for scheme, val in iteritems(identifiers): + if scheme in {'isbn', 'doi'}: + for prefix, parent in iteritems(extra_ids): + ie = parent.makeelement(expand('%s:%s'%(prefix, scheme))) + ie.text = val + parent.append(ie) + + d = xmp.makeelement(expand('xmp:MetadataDate')) + d.text = isoformat(now(), as_utc=False) + xmp.append(d) + + calibre = rdf.makeelement(expand('rdf:Description'), nsmap=nsmap('calibre', 'calibreSI', 'calibreCC')) + calibre.set(expand('rdf:about'), '') + rdf.append(calibre) + if not mi.is_null('rating'): + try: + r = float(mi.rating) + except (TypeError, ValueError): + pass + else: + create_simple_property(calibre, 'calibre:rating', '%g' % r) + if not mi.is_null('series'): + create_series(calibre, mi.series, mi.series_index) + if not mi.is_null('timestamp'): + create_simple_property(calibre, 'calibre:timestamp', isoformat(mi.timestamp, as_utc=False)) + for x in ('author_link_map', 'user_categories'): + val = getattr(mi, x, None) + if val: + create_simple_property(calibre, 'calibre:'+x, dump_dict(val)) + + for x in ('title_sort', 'author_sort'): + if not mi.is_null(x): + create_simple_property(calibre, 'calibre:'+x, getattr(mi, x)) + + all_user_metadata = mi.get_all_user_metadata(True) + if all_user_metadata: + create_user_metadata(calibre, all_user_metadata) + return serialize_xmp_packet(root) + + +def find_used_namespaces(elem): + getns = lambda x: (x.partition('}')[0][1:] if '}' in x else None) + ans = {getns(x) for x in list(elem.attrib) + [elem.tag]} + for child in elem.iterchildren(etree.Element): + ans |= find_used_namespaces(child) + return ans + + +def find_preferred_prefix(namespace, elems): + for elem in elems: + ans = {v:k for k, v in iteritems(elem.nsmap)}.get(namespace, None) + if ans is not None: + return ans + return find_preferred_prefix(namespace, elem.iterchildren(etree.Element)) + + +def find_nsmap(elems): + used_namespaces = set() + for elem in elems: + used_namespaces |= find_used_namespaces(elem) + ans = {} + used_namespaces -= {NS_MAP['xml'], NS_MAP['x'], None, NS_MAP['rdf']} + rmap = {v:k for k, v in iteritems(NS_MAP)} + i = 0 + for ns in used_namespaces: + if ns in rmap: + ans[rmap[ns]] = ns + else: + pp = find_preferred_prefix(ns, elems) + if pp and pp not in ans: + ans[pp] = ns + else: + i += 1 + ans['ns%d' % i] = ns + return ans + + +def clone_into(parent, elem): + ' Clone the element, assuming that all namespace declarations are present in parent ' + clone = parent.makeelement(elem.tag) + parent.append(clone) + if elem.text and not elem.text.isspace(): + clone.text = elem.text + if elem.tail and not elem.tail.isspace(): + clone.tail = elem.tail + clone.attrib.update(elem.attrib) + for child in elem.iterchildren(etree.Element): + clone_into(clone, child) + + +def merge_xmp_packet(old, new): + ''' Merge metadata present in the old packet that is not present in the new + one into the new one. Assumes the new packet was generated by + metadata_to_xmp_packet() ''' + old, new = parse_xmp_packet(old), parse_xmp_packet(new) + # As per the adobe spec all metadata items have to be present inside top-level rdf:Description containers + item_xpath = XPath('//rdf:RDF/rdf:Description/*') + + # First remove all data fields that metadata_to_xmp_packet() knowns about, + # since either they will have been set or if not present, imply they have + # been cleared + defined_tags = {expand(prefix + ':' + scheme) for prefix in ('prism', 'pdfx') for scheme in KNOWN_ID_SCHEMES} + defined_tags |= {expand('dc:' + x) for x in ('identifier', 'title', 'creator', 'date', 'description', 'language', 'publisher', 'subject')} + defined_tags |= {expand('xmp:' + x) for x in ('MetadataDate', 'Identifier')} + # For redundancy also remove all fields explicitly set in the new packet + defined_tags |= {x.tag for x in item_xpath(new)} + calibrens = '{%s}' % NS_MAP['calibre'] + for elem in item_xpath(old): + if elem.tag in defined_tags or (elem.tag and elem.tag.startswith(calibrens)): + elem.getparent().remove(elem) + + # Group all items into groups based on their namespaces + groups = defaultdict(list) + for item in item_xpath(new): + ns = item.nsmap[item.prefix] + groups[ns].append(item) + + for item in item_xpath(old): + ns = item.nsmap[item.prefix] + groups[ns].append(item) + + A = ElementMaker(namespace=NS_MAP['x'], nsmap=nsmap('x')) + R = ElementMaker(namespace=NS_MAP['rdf'], nsmap=nsmap('rdf')) + root = A.xmpmeta(R.RDF) + rdf = root[0] + + for namespace in sorted(groups, key=lambda x:{NS_MAP['dc']:'a', NS_MAP['xmp']:'b', NS_MAP['calibre']:'c'}.get(x, 'z'+x)): + items = groups[namespace] + desc = rdf.makeelement(expand('rdf:Description'), nsmap=find_nsmap(items)) + desc.set(expand('rdf:about'), '') + rdf.append(desc) + for item in items: + clone_into(desc, item) + + return serialize_xmp_packet(root) + + +if __name__ == '__main__': + from calibre.utils.podofo import get_xmp_metadata + xmp_packet = get_xmp_metadata(sys.argv[-1]) + mi = metadata_from_xmp_packet(xmp_packet) + np = metadata_to_xmp_packet(mi) + print(merge_xmp_packet(xmp_packet, np))