1
0
mirror of https://github.com/gryf/ebook-converter.git synced 2026-04-25 16:01:29 +02:00

Use the real constants module.

This is progressing refactor of the calibre code to make it more
readable, and transform it to something more coherent.

In this patch, there are changes regarding imports for some modules,
instead of polluting namespace of each module with some other modules
symbols, which often were imported from other modules. Yuck.
This commit is contained in:
2020-05-29 17:04:53 +02:00
parent ee4801228f
commit ce89f5c9d1
54 changed files with 2383 additions and 2081 deletions
+125 -79
View File
@@ -1,44 +1,43 @@
import re, sys, copy, json
from itertools import repeat
from collections import defaultdict
import collections
import copy
import itertools
import json
import re
import sys
from lxml import etree
from lxml.builder import ElementMaker
from ebook_converter import prints
from ebook_converter.ebooks.metadata import check_isbn, check_doi
from ebook_converter.utils.xml_parse import safe_xml_fromstring
from ebook_converter.ebooks.metadata.book.base import Metadata
from ebook_converter.ebooks.metadata.opf2 import dump_dict
from ebook_converter.utils.date import parse_date, isoformat, now
from ebook_converter.utils.localization import canonicalize_lang, lang_as_iso639_1
from ebook_converter.utils.localization import canonicalize_lang, \
lang_as_iso639_1
__license__ = 'GPL v3'
__copyright__ = '2014, Kovid Goyal <kovid at kovidgoyal.net>'
_xml_declaration = re.compile(r'<\?xml[^<>]+encoding\s*=\s*[\'"](.*?)'
r'[\'"][^<>]*>', re.IGNORECASE)
_xml_declaration = re.compile(r'<\?xml[^<>]+encoding\s*=\s*[\'"](.*?)[\'"][^<>]*>', re.IGNORECASE)
NS_MAP = {
'rdf': 'http://www.w3.org/1999/02/22-rdf-syntax-ns#',
'dc': 'http://purl.org/dc/elements/1.1/',
'pdf': 'http://ns.adobe.com/pdf/1.3/',
'pdfx': 'http://ns.adobe.com/pdfx/1.3/',
'xmp': 'http://ns.adobe.com/xap/1.0/',
'xmpidq': 'http://ns.adobe.com/xmp/Identifier/qual/1.0/',
'xmpMM': 'http://ns.adobe.com/xap/1.0/mm/',
'xmpRights': 'http://ns.adobe.com/xap/1.0/rights/',
'xmpBJ': 'http://ns.adobe.com/xap/1.0/bj/',
'xmpTPg': 'http://ns.adobe.com/xap/1.0/t/pg/',
'xmpDM': 'http://ns.adobe.com/xmp/1.0/DynamicMedia/',
'prism': 'http://prismstandard.org/namespaces/basic/2.0/',
'crossmark': 'http://crossref.org/crossmark/1.0/',
'xml': 'http://www.w3.org/XML/1998/namespace',
'x': 'adobe:ns:meta/',
'calibre': 'http://calibre-ebook.com/xmp-namespace',
'calibreSI': 'http://calibre-ebook.com/xmp-namespace-series-index',
'calibreCC': 'http://calibre-ebook.com/xmp-namespace-custom-columns',
}
NS_MAP = {'rdf': 'http://www.w3.org/1999/02/22-rdf-syntax-ns#',
'dc': 'http://purl.org/dc/elements/1.1/',
'pdf': 'http://ns.adobe.com/pdf/1.3/',
'pdfx': 'http://ns.adobe.com/pdfx/1.3/',
'xmp': 'http://ns.adobe.com/xap/1.0/',
'xmpidq': 'http://ns.adobe.com/xmp/Identifier/qual/1.0/',
'xmpMM': 'http://ns.adobe.com/xap/1.0/mm/',
'xmpRights': 'http://ns.adobe.com/xap/1.0/rights/',
'xmpBJ': 'http://ns.adobe.com/xap/1.0/bj/',
'xmpTPg': 'http://ns.adobe.com/xap/1.0/t/pg/',
'xmpDM': 'http://ns.adobe.com/xmp/1.0/DynamicMedia/',
'prism': 'http://prismstandard.org/namespaces/basic/2.0/',
'crossmark': 'http://crossref.org/crossmark/1.0/',
'xml': 'http://www.w3.org/XML/1998/namespace',
'x': 'adobe:ns:meta/',
'calibre': 'http://calibre-ebook.com/xmp-namespace',
'calibreSI': 'http://calibre-ebook.com/xmp-namespace-series-index',
'calibreCC': 'http://calibre-ebook.com/xmp-namespace-custom-columns'}
KNOWN_ID_SCHEMES = {'isbn', 'url', 'doi'}
@@ -63,7 +62,7 @@ def parse_xmp_packet(raw_bytes):
pat = r'''<?xpacket\s+[^>]*?begin\s*=\s*['"]([^'"]*)['"]'''
encodings = ('8', '16-le', '16-be', '32-le', '32-be')
header = raw_bytes[:1024]
emap = {'\ufeff'.encode('utf-'+x):'utf-'+x for x in encodings}
emap = {'\ufeff'.encode('utf-'+x): 'utf-'+x for x in encodings}
emap[b''] = 'utf-8'
for q in encodings:
m = re.search(pat.encode('utf-'+q), header)
@@ -71,15 +70,19 @@ def parse_xmp_packet(raw_bytes):
enc = emap.get(m.group(1), enc)
break
if enc is None:
return safe_xml_fromstring(raw_bytes)
raw = _xml_declaration.sub('', raw_bytes.decode(enc)) # lxml barfs if encoding declaration present in unicode string
return safe_xml_fromstring(raw)
return etree.fromstring(raw_bytes)
# lxml barfs if encoding declaration present in unicode string
raw = _xml_declaration.sub('', raw_bytes.decode(enc))
return etree.fromstring(raw)
def serialize_xmp_packet(root, encoding='utf-8'):
root.tail = '\n' + '\n'.join(repeat(' '*100, 30)) # Adobe spec recommends inserting padding at the end of the packet
raw_bytes = etree.tostring(root, encoding=encoding, pretty_print=True, with_tail=True, method='xml')
return b'<?xpacket begin="%s" id="W5M0MpCehiHzreSzNTczkc9d"?>\n%s\n<?xpacket end="w"?>' % ('\ufeff'.encode(encoding), raw_bytes)
# Adobe spec recommends inserting padding at the end of the packet
root.tail = '\n' + '\n'.join(itertools.repeat(' '*100, 30))
raw_bytes = etree.tostring(root, encoding=encoding, pretty_print=True,
with_tail=True, method='xml')
return ('<?xpacket begin="%s" id="W5M0MpCehiHzreSzNTczkc9d"?>\n%s\n'
'<?xpacket end="w"?>' % ('\ufeff'.encode(encoding), raw_bytes))
def read_simple_property(elem):
@@ -106,14 +109,15 @@ def read_sequence(parent):
yield read_simple_property(item)
def uniq(vals, kmap=lambda x:x):
def uniq(vals, kmap=lambda x: x):
''' Remove all duplicates from vals, while preserving order. kmap must be a
callable that returns a hashable value for every item in vals '''
vals = vals or ()
lvals = (kmap(x) for x in vals)
seen = set()
seen_add = seen.add
return tuple(x for x, k in zip(vals, lvals) if k not in seen and not seen_add(k))
return tuple(x for x, k in zip(vals, lvals) if k not in seen
and not seen_add(k))
def multiple_sequences(expr, root):
@@ -170,7 +174,8 @@ def read_series(root):
def read_user_metadata(mi, root):
from ebook_converter.utils.config import from_json
from ebook_converter.ebooks.metadata.book.json_codec import decode_is_multiple
from ebook_converter.ebooks.metadata.book.json_codec import \
decode_is_multiple
fields = set()
for item in XPath('//calibre:custom_metadata')(root):
for li in XPath('./rdf:Bag/rdf:li')(item):
@@ -186,7 +191,7 @@ def read_user_metadata(mi, root):
decode_is_multiple(fm)
mi.set_user_metadata(name, fm)
fields.add(name)
except:
except Exception:
prints('Failed to read user metadata:', name)
import traceback
traceback.print_exc()
@@ -194,13 +199,17 @@ def read_user_metadata(mi, root):
def read_xmp_identifers(parent):
''' For example:
<rdf:li rdf:parseType="Resource"><xmpidq:Scheme>URL</xmp:idq><rdf:value>http://foo.com</rdf:value></rdf:li>
<rdf:li rdf:parseType="Resource"><xmpidq:Scheme>URL</xmp:idq>
<rdf:value>http://foo.com</rdf:value></rdf:li>
or the longer form:
<rdf:li><rdf:Description><xmpidq:Scheme>URL</xmp:idq><rdf:value>http://foo.com</rdf:value></rdf:Description></rdf:li>
<rdf:li><rdf:Description><xmpidq:Scheme>URL</xmp:idq>
<rdf:value>http://foo.com</rdf:value></rdf:Description></rdf:li>
'''
for li in XPath('./rdf:Bag/rdf:li')(parent):
is_resource = li.attrib.get(expand('rdf:parseType'), None) == 'Resource'
is_resource = is_resource or (len(li) == 1 and li[0].tag == expand('rdf:Description'))
is_resource = li.attrib.get(expand('rdf:parseType'),
None) == 'Resource'
is_resource = is_resource or (len(li) == 1 and
li[0].tag == expand('rdf:Description'))
if not is_resource:
yield None, li.text or ''
value = XPath('descendant::rdf:value')(li)
@@ -241,12 +250,15 @@ def metadata_from_xmp_packet(raw_bytes):
if title.startswith(r'\376\377'):
# corrupted XMP packet generated by Nitro PDF. See
# https://bugs.launchpad.net/calibre/+bug/1541981
raise ValueError('Corrupted XMP metadata packet detected, probably generated by Nitro PDF')
raise ValueError('Corrupted XMP metadata packet detected, '
'probably generated by Nitro PDF')
mi.title = title
authors = multiple_sequences('//dc:creator', root)
if authors:
mi.authors = authors
tags = multiple_sequences('//dc:subject', root) or multiple_sequences('//pdf:Keywords', root)
tags = multiple_sequences('//dc:subject',
root) or multiple_sequences('//pdf:Keywords',
root)
if tags:
mi.tags = tags
comments = first_alt('//dc:description', root)
@@ -256,8 +268,10 @@ def metadata_from_xmp_packet(raw_bytes):
if publishers:
mi.publisher = publishers[0]
try:
pubdate = parse_date(first_sequence('//dc:date', root) or first_simple('//xmp:CreateDate', root), assume_utc=False)
except:
pubdate = (parse_date(first_sequence('//dc:date', root) or
first_simple('//xmp:CreateDate', root),
assume_utc=False))
except Exception:
pass
else:
mi.pubdate = pubdate
@@ -291,7 +305,7 @@ def metadata_from_xmp_packet(raw_bytes):
if val:
try:
setattr(mi, x, json.loads(val))
except:
except Exception:
pass
languages = multiple_sequences('//dc:language', root)
@@ -319,7 +333,7 @@ def metadata_from_xmp_packet(raw_bytes):
identifiers[scheme] = val
# Check Dublin Core for recognizable identifier types
for scheme, check_func in {'doi':check_doi, 'isbn':check_isbn}.items():
for scheme, check_func in {'doi': check_doi, 'isbn': check_isbn}.items():
if scheme not in identifiers:
val = check_func(first_simple('//dc:identifier', root))
if val:
@@ -359,17 +373,21 @@ def consolidate_metadata(info_mi, info):
else:
prefer_info = info_date > xmp_mi.metadata_date
if prefer_info:
info_mi.title, info_mi.authors, info_mi.tags = info_title, info_authors, info_tags
info_mi.title = info_title
info_mi.authors = info_authors
info_mi.tags = info_tags
else:
# We'll use the xmp tags/authors but fallback to the info ones if the
# xmp does not have tags/authors. smart_update() should have taken care of
# the rest
info_mi.authors, info_mi.tags = (info_authors if xmp_mi.is_null('authors') else xmp_mi.authors), xmp_mi.tags or info_tags
# xmp does not have tags/authors. smart_update() should have taken care
# of the rest
info_mi.authors = (info_authors if xmp_mi.is_null('authors')
else xmp_mi.authors)
info_mi.tags = xmp_mi.tags or info_tags
return info_mi
def nsmap(*args):
return {x:NS_MAP[x] for x in args}
return {x: NS_MAP[x] for x in args}
def create_simple_property(parent, tag, value):
@@ -435,7 +453,8 @@ def create_series(calibre, series, series_index):
def create_user_metadata(calibre, all_user_metadata):
from ebook_converter.utils.config import to_json
from ebook_converter.ebooks.metadata.book.json_codec import object_to_unicode, encode_is_multiple
from ebook_converter.ebooks.metadata.book.json_codec import \
object_to_unicode, encode_is_multiple
s = calibre.makeelement(expand('calibre:custom_metadata'))
calibre.append(s)
@@ -447,7 +466,7 @@ def create_user_metadata(calibre, all_user_metadata):
encode_is_multiple(fm)
fm = object_to_unicode(fm)
fm = json.dumps(fm, default=to_json, ensure_ascii=False)
except:
except Exception:
prints('Failed to write user metadata:', name)
import traceback
traceback.print_exc()
@@ -471,7 +490,8 @@ def metadata_to_xmp_packet(mi):
dc = rdf.makeelement(expand('rdf:Description'), nsmap=nsmap('dc'))
dc.set(expand('rdf:about'), '')
rdf.append(dc)
for prop, tag in {'title':'dc:title', 'comments':'dc:description'}.items():
for prop, tag in {'title': 'dc:title',
'comments': 'dc:description'}.items():
val = mi.get(prop) or ''
create_alt_property(dc, tag, val)
for prop, (tag, ordered) in {'authors': ('dc:creator', True),
@@ -482,18 +502,23 @@ def metadata_to_xmp_packet(mi):
val = [val]
create_sequence_property(dc, tag, val, ordered)
if not mi.is_null('pubdate'):
create_sequence_property(dc, 'dc:date', [isoformat(mi.pubdate, as_utc=False)]) # Adobe spec recommends local time
# Adobe spec recommends local time
create_sequence_property(dc, 'dc:date',
[isoformat(mi.pubdate, as_utc=False)])
if not mi.is_null('languages'):
langs = list(filter(None, map(lambda x:lang_as_iso639_1(x) or canonicalize_lang(x), mi.languages)))
langs = list(filter(None, map(lambda x: lang_as_iso639_1(x) or
canonicalize_lang(x), mi.languages)))
if langs:
create_sequence_property(dc, 'dc:language', langs, ordered=False)
xmp = rdf.makeelement(expand('rdf:Description'), nsmap=nsmap('xmp', 'xmpidq'))
xmp = rdf.makeelement(expand('rdf:Description'),
nsmap=nsmap('xmp', 'xmpidq'))
xmp.set(expand('rdf:about'), '')
rdf.append(xmp)
extra_ids = {}
for x in ('prism', 'pdfx'):
p = extra_ids[x] = rdf.makeelement(expand('rdf:Description'), nsmap=nsmap(x))
p = extra_ids[x] = rdf.makeelement(expand('rdf:Description'),
nsmap=nsmap(x))
p.set(expand('rdf:about'), '')
rdf.append(p)
@@ -503,7 +528,7 @@ def metadata_to_xmp_packet(mi):
for scheme, val in identifiers.items():
if scheme in {'isbn', 'doi'}:
for prefix, parent in extra_ids.items():
ie = parent.makeelement(expand('%s:%s'%(prefix, scheme)))
ie = parent.makeelement(expand('%s:%s' % (prefix, scheme)))
ie.text = val
parent.append(ie)
@@ -511,7 +536,8 @@ def metadata_to_xmp_packet(mi):
d.text = isoformat(now(), as_utc=False)
xmp.append(d)
calibre = rdf.makeelement(expand('rdf:Description'), nsmap=nsmap('calibre', 'calibreSI', 'calibreCC'))
calibre = rdf.makeelement(expand('rdf:Description'),
nsmap=nsmap('calibre', 'calibreSI', 'calibreCC'))
calibre.set(expand('rdf:about'), '')
rdf.append(calibre)
if not mi.is_null('rating'):
@@ -524,7 +550,8 @@ def metadata_to_xmp_packet(mi):
if not mi.is_null('series'):
create_series(calibre, mi.series, mi.series_index)
if not mi.is_null('timestamp'):
create_simple_property(calibre, 'calibre:timestamp', isoformat(mi.timestamp, as_utc=False))
create_simple_property(calibre, 'calibre:timestamp',
isoformat(mi.timestamp, as_utc=False))
for x in ('author_link_map', 'user_categories'):
val = getattr(mi, x, None)
if val:
@@ -550,10 +577,11 @@ def find_used_namespaces(elem):
def find_preferred_prefix(namespace, elems):
for elem in elems:
ans = {v:k for k, v in elem.nsmap.items()}.get(namespace, None)
ans = {v: k for k, v in elem.nsmap.items()}.get(namespace, None)
if ans is not None:
return ans
return find_preferred_prefix(namespace, elem.iterchildren(etree.Element))
return find_preferred_prefix(namespace,
elem.iterchildren(etree.Element))
def find_nsmap(elems):
@@ -562,7 +590,7 @@ def find_nsmap(elems):
used_namespaces |= find_used_namespaces(elem)
ans = {}
used_namespaces -= {NS_MAP['xml'], NS_MAP['x'], None, NS_MAP['rdf']}
rmap = {v:k for k, v in NS_MAP.items()}
rmap = {v: k for k, v in NS_MAP.items()}
i = 0
for ns in used_namespaces:
if ns in rmap:
@@ -578,7 +606,10 @@ def find_nsmap(elems):
def clone_into(parent, elem):
' Clone the element, assuming that all namespace declarations are present in parent '
"""
Clone the element, assuming that all namespace declarations are present
in parent
"""
clone = parent.makeelement(elem.tag)
parent.append(clone)
if elem.text and not elem.text.isspace():
@@ -591,28 +622,38 @@ def clone_into(parent, elem):
def merge_xmp_packet(old, new):
''' Merge metadata present in the old packet that is not present in the new
"""
Merge metadata present in the old packet that is not present in the new
one into the new one. Assumes the new packet was generated by
metadata_to_xmp_packet() '''
metadata_to_xmp_packet()
"""
old, new = parse_xmp_packet(old), parse_xmp_packet(new)
# As per the adobe spec all metadata items have to be present inside top-level rdf:Description containers
# As per the adobe spec all metadata items have to be present inside
# top-level rdf:Description containers
item_xpath = XPath('//rdf:RDF/rdf:Description/*')
# First remove all data fields that metadata_to_xmp_packet() knowns about,
# since either they will have been set or if not present, imply they have
# been cleared
defined_tags = {expand(prefix + ':' + scheme) for prefix in ('prism', 'pdfx') for scheme in KNOWN_ID_SCHEMES}
defined_tags |= {expand('dc:' + x) for x in ('identifier', 'title', 'creator', 'date', 'description', 'language', 'publisher', 'subject')}
defined_tags |= {expand('xmp:' + x) for x in ('MetadataDate', 'Identifier')}
defined_tags = {expand(prefix + ':' + scheme)
for prefix in ('prism', 'pdfx')
for scheme in KNOWN_ID_SCHEMES}
defined_tags |= {expand('dc:' + x)
for x in ('identifier', 'title', 'creator', 'date',
'description', 'language', 'publisher',
'subject')}
defined_tags |= {expand('xmp:' + x)
for x in ('MetadataDate', 'Identifier')}
# For redundancy also remove all fields explicitly set in the new packet
defined_tags |= {x.tag for x in item_xpath(new)}
calibrens = '{%s}' % NS_MAP['calibre']
for elem in item_xpath(old):
if elem.tag in defined_tags or (elem.tag and elem.tag.startswith(calibrens)):
if elem.tag in defined_tags or (elem.tag and
elem.tag.startswith(calibrens)):
elem.getparent().remove(elem)
# Group all items into groups based on their namespaces
groups = defaultdict(list)
groups = collections.defaultdict(list)
for item in item_xpath(new):
ns = item.nsmap[item.prefix]
groups[ns].append(item)
@@ -626,9 +667,14 @@ def merge_xmp_packet(old, new):
root = A.xmpmeta(R.RDF)
rdf = root[0]
for namespace in sorted(groups, key=lambda x:{NS_MAP['dc']:'a', NS_MAP['xmp']:'b', NS_MAP['calibre']:'c'}.get(x, 'z'+x)):
for namespace in sorted(groups,
key=lambda x: {NS_MAP['dc']: 'a',
NS_MAP['xmp']: 'b',
NS_MAP['calibre']: 'c'}.get(x,
'z'+x)):
items = groups[namespace]
desc = rdf.makeelement(expand('rdf:Description'), nsmap=find_nsmap(items))
desc = rdf.makeelement(expand('rdf:Description'),
nsmap=find_nsmap(items))
desc.set(expand('rdf:about'), '')
rdf.append(desc)
for item in items: