1
0
mirror of https://github.com/gryf/ebook-converter.git synced 2026-03-27 13:53:32 +01:00

Initial import

This commit is contained in:
2020-03-31 17:15:23 +02:00
commit d97ea9b0bc
311 changed files with 131419 additions and 0 deletions

View File

@@ -0,0 +1,440 @@
#!/usr/bin/env python2
# vim:fileencoding=utf-8
from __future__ import absolute_import, division, print_function, unicode_literals
__license__ = 'GPL v3'
__copyright__ = '2008, Kovid Goyal kovid@kovidgoyal.net'
__docformat__ = 'restructuredtext en'
"""
Provides abstraction for metadata reading.writing from a variety of ebook formats.
"""
import os, sys, re
from calibre import relpath, guess_type, prints, force_unicode
from calibre.utils.config_base import tweaks
from polyglot.builtins import codepoint_to_chr, unicode_type, range, map, zip, getcwd, iteritems, itervalues, as_unicode
from polyglot.urllib import quote, unquote, urlparse
try:
_author_pat = re.compile(tweaks['authors_split_regex'])
except Exception:
prints('Author split regexp:', tweaks['authors_split_regex'],
'is invalid, using default')
_author_pat = re.compile(r'(?i),?\s+(and|with)\s+')
def string_to_authors(raw):
if not raw:
return []
raw = raw.replace('&&', '\uffff')
raw = _author_pat.sub('&', raw)
authors = [a.strip().replace('\uffff', '&') for a in raw.split('&')]
return [a for a in authors if a]
def authors_to_string(authors):
if authors is not None:
return ' & '.join([a.replace('&', '&&') for a in authors if a])
else:
return ''
def remove_bracketed_text(src, brackets=None):
if brackets is None:
brackets = {'(': ')', '[': ']', '{': '}'}
from collections import Counter
counts = Counter()
buf = []
src = force_unicode(src)
rmap = {v: k for k, v in iteritems(brackets)}
for char in src:
if char in brackets:
counts[char] += 1
elif char in rmap:
idx = rmap[char]
if counts[idx] > 0:
counts[idx] -= 1
elif sum(itervalues(counts)) < 1:
buf.append(char)
return ''.join(buf)
def author_to_author_sort(author, method=None):
if not author:
return ''
sauthor = remove_bracketed_text(author).strip()
tokens = sauthor.split()
if len(tokens) < 2:
return author
if method is None:
method = tweaks['author_sort_copy_method']
ltoks = frozenset(x.lower() for x in tokens)
copy_words = frozenset(x.lower() for x in tweaks['author_name_copywords'])
if ltoks.intersection(copy_words):
method = 'copy'
if method == 'copy':
return author
prefixes = {force_unicode(y).lower() for y in tweaks['author_name_prefixes']}
prefixes |= {y+'.' for y in prefixes}
while True:
if not tokens:
return author
tok = tokens[0].lower()
if tok in prefixes:
tokens = tokens[1:]
else:
break
suffixes = {force_unicode(y).lower() for y in tweaks['author_name_suffixes']}
suffixes |= {y+'.' for y in suffixes}
suffix = ''
while True:
if not tokens:
return author
last = tokens[-1].lower()
if last in suffixes:
suffix = tokens[-1] + ' ' + suffix
tokens = tokens[:-1]
else:
break
suffix = suffix.strip()
if method == 'comma' and ',' in ''.join(tokens):
return author
atokens = tokens[-1:] + tokens[:-1]
num_toks = len(atokens)
if suffix:
atokens.append(suffix)
if method != 'nocomma' and num_toks > 1:
atokens[0] += ','
return ' '.join(atokens)
def authors_to_sort_string(authors):
return ' & '.join(map(author_to_author_sort, authors))
_title_pats = {}
def get_title_sort_pat(lang=None):
ans = _title_pats.get(lang, None)
if ans is not None:
return ans
q = lang
from calibre.utils.localization import canonicalize_lang, get_lang
if lang is None:
q = tweaks['default_language_for_title_sort']
if q is None:
q = get_lang()
q = canonicalize_lang(q) if q else q
data = tweaks['per_language_title_sort_articles']
try:
ans = data.get(q, None)
except AttributeError:
ans = None # invalid tweak value
try:
ans = frozenset(ans) if ans else frozenset(data['eng'])
except:
ans = frozenset((r'A\s+', r'The\s+', r'An\s+'))
ans = '|'.join(ans)
ans = '^(%s)'%ans
try:
ans = re.compile(ans, re.IGNORECASE)
except:
ans = re.compile(r'^(A|The|An)\s+', re.IGNORECASE)
_title_pats[lang] = ans
return ans
_ignore_starts = '\'"'+''.join(codepoint_to_chr(x) for x in
list(range(0x2018, 0x201e))+[0x2032, 0x2033])
def title_sort(title, order=None, lang=None):
if order is None:
order = tweaks['title_series_sorting']
title = title.strip()
if order == 'strictly_alphabetic':
return title
if title and title[0] in _ignore_starts:
title = title[1:]
match = get_title_sort_pat(lang).search(title)
if match:
try:
prep = match.group(1)
except IndexError:
pass
else:
title = title[len(prep):] + ', ' + prep
if title[0] in _ignore_starts:
title = title[1:]
return title.strip()
coding = list(zip(
[1000,900,500,400,100,90,50,40,10,9,5,4,1],
["M","CM","D","CD","C","XC","L","XL","X","IX","V","IV","I"]
))
def roman(num):
if num <= 0 or num >= 4000 or int(num) != num:
return unicode_type(num)
result = []
for d, r in coding:
while num >= d:
result.append(r)
num -= d
return ''.join(result)
def fmt_sidx(i, fmt='%.2f', use_roman=False):
if i is None or i == '':
i = 1
try:
i = float(i)
except TypeError:
return unicode_type(i)
if int(i) == float(i):
return roman(int(i)) if use_roman else '%d'%int(i)
return fmt%i
class Resource(object):
'''
Represents a resource (usually a file on the filesystem or a URL pointing
to the web. Such resources are commonly referred to in OPF files.
They have the interface:
:member:`path`
:member:`mime_type`
:method:`href`
'''
def __init__(self, href_or_path, basedir=getcwd(), is_path=True):
self._href = None
self._basedir = basedir
self.path = None
self.fragment = ''
try:
self.mime_type = guess_type(href_or_path)[0]
except:
self.mime_type = None
if self.mime_type is None:
self.mime_type = 'application/octet-stream'
if is_path:
path = href_or_path
if not os.path.isabs(path):
path = os.path.abspath(os.path.join(basedir, path))
if isinstance(path, bytes):
path = path.decode(sys.getfilesystemencoding())
self.path = path
else:
url = urlparse(href_or_path)
if url[0] not in ('', 'file'):
self._href = href_or_path
else:
pc = url[2]
if isinstance(pc, unicode_type):
pc = pc.encode('utf-8')
pc = unquote(pc).decode('utf-8')
self.path = os.path.abspath(os.path.join(basedir, pc.replace('/', os.sep)))
self.fragment = unquote(url[-1])
def href(self, basedir=None):
'''
Return a URL pointing to this resource. If it is a file on the filesystem
the URL is relative to `basedir`.
`basedir`: If None, the basedir of this resource is used (see :method:`set_basedir`).
If this resource has no basedir, then the current working directory is used as the basedir.
'''
if basedir is None:
if self._basedir:
basedir = self._basedir
else:
basedir = getcwd()
if self.path is None:
return self._href
f = self.fragment.encode('utf-8') if isinstance(self.fragment, unicode_type) else self.fragment
frag = '#'+as_unicode(quote(f)) if self.fragment else ''
if self.path == basedir:
return ''+frag
try:
rpath = relpath(self.path, basedir)
except OSError: # On windows path and basedir could be on different drives
rpath = self.path
if isinstance(rpath, unicode_type):
rpath = rpath.encode('utf-8')
return as_unicode(quote(rpath.replace(os.sep, '/')))+frag
def set_basedir(self, path):
self._basedir = path
def basedir(self):
return self._basedir
def __repr__(self):
return 'Resource(%s, %s)'%(repr(self.path), repr(self.href()))
class ResourceCollection(object):
def __init__(self):
self._resources = []
def __iter__(self):
for r in self._resources:
yield r
def __len__(self):
return len(self._resources)
def __getitem__(self, index):
return self._resources[index]
def __bool__(self):
return len(self._resources) > 0
def __str__(self):
resources = map(repr, self)
return '[%s]'%', '.join(resources)
def __repr__(self):
return unicode_type(self)
def append(self, resource):
if not isinstance(resource, Resource):
raise ValueError('Can only append objects of type Resource')
self._resources.append(resource)
def remove(self, resource):
self._resources.remove(resource)
def replace(self, start, end, items):
'Same as list[start:end] = items'
self._resources[start:end] = items
@staticmethod
def from_directory_contents(top, topdown=True):
collection = ResourceCollection()
for spec in os.walk(top, topdown=topdown):
path = os.path.abspath(os.path.join(spec[0], spec[1]))
res = Resource.from_path(path)
res.set_basedir(top)
collection.append(res)
return collection
def set_basedir(self, path):
for res in self:
res.set_basedir(path)
def MetaInformation(title, authors=(_('Unknown'),)):
''' Convenient encapsulation of book metadata, needed for compatibility
@param title: title or ``_('Unknown')`` or a MetaInformation object
@param authors: List of strings or []
'''
from calibre.ebooks.metadata.book.base import Metadata
mi = None
if hasattr(title, 'title') and hasattr(title, 'authors'):
mi = title
title = mi.title
authors = mi.authors
return Metadata(title, authors, other=mi)
def check_isbn10(isbn):
try:
digits = tuple(map(int, isbn[:9]))
products = [(i+1)*digits[i] for i in range(9)]
check = sum(products)%11
if (check == 10 and isbn[9] == 'X') or check == int(isbn[9]):
return isbn
except Exception:
pass
return None
def check_isbn13(isbn):
try:
digits = tuple(map(int, isbn[:12]))
products = [(1 if i%2 ==0 else 3)*digits[i] for i in range(12)]
check = 10 - (sum(products)%10)
if check == 10:
check = 0
if unicode_type(check) == isbn[12]:
return isbn
except Exception:
pass
return None
def check_isbn(isbn):
if not isbn:
return None
isbn = re.sub(r'[^0-9X]', '', isbn.upper())
all_same = re.match(r'(\d)\1{9,12}$', isbn)
if all_same is not None:
return None
if len(isbn) == 10:
return check_isbn10(isbn)
if len(isbn) == 13:
return check_isbn13(isbn)
return None
def check_issn(issn):
if not issn:
return None
issn = re.sub(r'[^0-9X]', '', issn.upper())
try:
digits = tuple(map(int, issn[:7]))
products = [(8 - i) * d for i, d in enumerate(digits)]
check = 11 - sum(products) % 11
if (check == 10 and issn[7] == 'X') or check == int(issn[7]):
return issn
except Exception:
pass
return None
def format_isbn(isbn):
cisbn = check_isbn(isbn)
if not cisbn:
return isbn
i = cisbn
if len(i) == 10:
return '-'.join((i[:2], i[2:6], i[6:9], i[9]))
return '-'.join((i[:3], i[3:5], i[5:9], i[9:12], i[12]))
def check_doi(doi):
'Check if something that looks like a DOI is present anywhere in the string'
if not doi:
return None
doi_check = re.search(r'10\.\d{4}/\S+', doi)
if doi_check is not None:
return doi_check.group()
return None
def rating_to_stars(value, allow_half_stars=False, star='', half='½'):
r = max(0, min(int(value or 0), 10))
ans = star * (r // 2)
if allow_half_stars and r % 2:
ans += half
return ans

View File

@@ -0,0 +1,203 @@
#!/usr/bin/env python2
# vim:fileencoding=UTF-8:ts=4:sw=4:sta:et:sts=4:ai
from __future__ import absolute_import, division, print_function, unicode_literals
__license__ = 'GPL v3'
__copyright__ = '2010, Kovid Goyal <kovid@kovidgoyal.net>'
__docformat__ = 'restructuredtext en'
import os
from contextlib import closing
from calibre.customize import FileTypePlugin
from calibre.utils.localization import canonicalize_lang
from polyglot.builtins import filter, unicode_type
def is_comic(list_of_names):
extensions = {x.rpartition('.')[-1].lower() for x in list_of_names
if '.' in x and x.lower().rpartition('/')[-1] != 'thumbs.db'}
comic_extensions = {'jpg', 'jpeg', 'png'}
return len(extensions - comic_extensions) == 0
def archive_type(stream):
from calibre.utils.zipfile import stringFileHeader
try:
pos = stream.tell()
except:
pos = 0
id_ = stream.read(4)
ans = None
if id_ == stringFileHeader:
ans = 'zip'
elif id_.startswith(b'Rar'):
ans = 'rar'
try:
stream.seek(pos)
except Exception:
pass
return ans
class KPFExtract(FileTypePlugin):
name = 'KPF Extract'
author = 'Kovid Goyal'
description = _('Extract the source DOCX file from Amazon Kindle Create KPF files.'
' Note this will not contain any edits made in the Kindle Create program itself.')
file_types = {'kpf'}
supported_platforms = ['windows', 'osx', 'linux']
on_import = True
def run(self, archive):
from calibre.utils.zipfile import ZipFile
with ZipFile(archive, 'r') as zf:
fnames = zf.namelist()
candidates = [x for x in fnames if x.lower().endswith('.docx')]
if not candidates:
return archive
of = self.temporary_file('_kpf_extract.docx')
with closing(of):
of.write(zf.read(candidates[0]))
return of.name
class ArchiveExtract(FileTypePlugin):
name = 'Archive Extract'
author = 'Kovid Goyal'
description = _('Extract common e-book formats from archive files '
'(ZIP/RAR). Also try to autodetect if they are actually '
'CBZ/CBR files.')
file_types = {'zip', 'rar'}
supported_platforms = ['windows', 'osx', 'linux']
on_import = True
def run(self, archive):
from calibre.utils.zipfile import ZipFile
is_rar = archive.lower().endswith('.rar')
if is_rar:
from calibre.utils.unrar import extract_member, names
else:
zf = ZipFile(archive, 'r')
if is_rar:
fnames = list(names(archive))
else:
fnames = zf.namelist()
def fname_ok(fname):
bn = os.path.basename(fname).lower()
if bn == 'thumbs.db':
return False
if '.' not in bn:
return False
if bn.rpartition('.')[-1] in {'diz', 'nfo'}:
return False
if '__MACOSX' in fname.split('/'):
return False
return True
fnames = list(filter(fname_ok, fnames))
if is_comic(fnames):
ext = '.cbr' if is_rar else '.cbz'
of = self.temporary_file('_archive_extract'+ext)
with open(archive, 'rb') as f:
of.write(f.read())
of.close()
return of.name
if len(fnames) > 1 or not fnames:
return archive
fname = fnames[0]
ext = os.path.splitext(fname)[1][1:]
if ext.lower() not in {
'lit', 'epub', 'mobi', 'prc', 'rtf', 'pdf', 'mp3', 'pdb',
'azw', 'azw1', 'azw3', 'fb2', 'docx', 'doc', 'odt'}:
return archive
of = self.temporary_file('_archive_extract.'+ext)
with closing(of):
if is_rar:
data = extract_member(archive, match=None, name=fname)[1]
of.write(data)
else:
of.write(zf.read(fname))
return of.name
def get_comic_book_info(d, mi, series_index='volume'):
# See http://code.google.com/p/comicbookinfo/wiki/Example
series = d.get('series', '')
if series.strip():
mi.series = series
si = d.get(series_index, None)
if si is None:
si = d.get('issue' if series_index == 'volume' else 'volume', None)
if si is not None:
try:
mi.series_index = float(si)
except Exception:
mi.series_index = 1
if d.get('language', None):
lang = canonicalize_lang(d.get('lang'))
if lang:
mi.languages = [lang]
if d.get('rating', -1) > -1:
mi.rating = d['rating']
for x in ('title', 'publisher'):
y = d.get(x, '').strip()
if y:
setattr(mi, x, y)
tags = d.get('tags', [])
if tags:
mi.tags = tags
authors = []
for credit in d.get('credits', []):
if credit.get('role', '') in ('Writer', 'Artist', 'Cartoonist',
'Creator'):
x = credit.get('person', '')
if x:
x = ' '.join((reversed(x.split(', '))))
authors.append(x)
if authors:
mi.authors = authors
comments = d.get('comments', '')
if comments and comments.strip():
mi.comments = comments.strip()
pubm, puby = d.get('publicationMonth', None), d.get('publicationYear', None)
if puby is not None:
from calibre.utils.date import parse_only_date
from datetime import date
try:
dt = date(puby, 6 if pubm is None else pubm, 15)
dt = parse_only_date(unicode_type(dt))
mi.pubdate = dt
except Exception:
pass
def parse_comic_comment(comment, series_index='volume'):
# See http://code.google.com/p/comicbookinfo/wiki/Example
from calibre.ebooks.metadata import MetaInformation
import json
mi = MetaInformation(None, None)
m = json.loads(comment)
if isinstance(m, dict):
for cat in m:
if cat.startswith('ComicBookInfo'):
get_comic_book_info(m[cat], mi, series_index=series_index)
break
return mi
def get_comic_metadata(stream, stream_type, series_index='volume'):
comment = None
if stream_type == 'cbz':
from calibre.utils.zipfile import ZipFile
zf = ZipFile(stream)
comment = zf.comment
elif stream_type == 'cbr':
from calibre.utils.unrar import comment as get_comment
comment = get_comment(stream)
return parse_comic_comment(comment or b'{}', series_index=series_index)

View File

@@ -0,0 +1,132 @@
#!/usr/bin/env python2
# vim:fileencoding=UTF-8:ts=4:sw=4:sta:et:sts=4:ai
from __future__ import absolute_import, division, print_function, unicode_literals
__license__ = 'GPL v3'
__copyright__ = '2010, Kovid Goyal <kovid@kovidgoyal.net>'
__docformat__ = 'restructuredtext en'
'''
All fields must have a NULL value represented as None for simple types,
an empty list/dictionary for complex types and (None, None) for cover_data
'''
SOCIAL_METADATA_FIELDS = frozenset((
'tags', # Ordered list
'rating', # A floating point number between 0 and 10
'comments', # A simple HTML enabled string
'series', # A simple string
'series_index', # A floating point number
# Of the form { scheme1:value1, scheme2:value2}
# For example: {'isbn':'123456789', 'doi':'xxxx', ... }
'identifiers',
))
'''
The list of names that convert to identifiers when in get and set.
'''
TOP_LEVEL_IDENTIFIERS = frozenset((
'isbn',
))
PUBLICATION_METADATA_FIELDS = frozenset((
'title', # title must never be None. Should be _('Unknown')
# Pseudo field that can be set, but if not set is auto generated
# from title and languages
'title_sort',
'authors', # Ordered list. Must never be None, can be [_('Unknown')]
'author_sort_map', # Map of sort strings for each author
# Pseudo field that can be set, but if not set is auto generated
# from authors and languages
'author_sort',
'book_producer',
'timestamp', # Dates and times must be timezone aware
'pubdate',
'last_modified',
'rights',
# So far only known publication type is periodical:calibre
# If None, means book
'publication_type',
'uuid', # A UUID usually of type 4
'languages', # ordered list of languages in this publication
'publisher', # Simple string, no special semantics
# Absolute path to image file encoded in filesystem_encoding
'cover',
# Of the form (format, data) where format is, for e.g. 'jpeg', 'png', 'gif'...
'cover_data',
# Either thumbnail data, or an object with the attribute
# image_path which is the path to an image file, encoded
# in filesystem_encoding
'thumbnail',
))
BOOK_STRUCTURE_FIELDS = frozenset((
# These are used by code, Null values are None.
'toc', 'spine', 'guide', 'manifest',
))
USER_METADATA_FIELDS = frozenset((
# A dict of dicts similar to field_metadata. Each field description dict
# also contains a value field with the key #value#.
'user_metadata',
))
DEVICE_METADATA_FIELDS = frozenset((
'device_collections', # Ordered list of strings
'lpath', # Unicode, / separated
'size', # In bytes
'mime', # Mimetype of the book file being represented
))
CALIBRE_METADATA_FIELDS = frozenset((
'application_id', # An application id, currently set to the db_id.
'db_id', # the calibre primary key of the item.
'formats', # list of formats (extensions) for this book
# a dict of user category names, where the value is a list of item names
# from the book that are in that category
'user_categories',
# a dict of author to an associated hyperlink
'author_link_map',
))
ALL_METADATA_FIELDS = SOCIAL_METADATA_FIELDS.union(
PUBLICATION_METADATA_FIELDS).union(
BOOK_STRUCTURE_FIELDS).union(
USER_METADATA_FIELDS).union(
DEVICE_METADATA_FIELDS).union(
CALIBRE_METADATA_FIELDS)
# All fields except custom fields
STANDARD_METADATA_FIELDS = SOCIAL_METADATA_FIELDS.union(
PUBLICATION_METADATA_FIELDS).union(
BOOK_STRUCTURE_FIELDS).union(
DEVICE_METADATA_FIELDS).union(
CALIBRE_METADATA_FIELDS)
# Metadata fields that smart update must do special processing to copy.
SC_FIELDS_NOT_COPIED = frozenset(('title', 'title_sort', 'authors',
'author_sort', 'author_sort_map',
'cover_data', 'tags', 'languages',
'identifiers'))
# Metadata fields that smart update should copy only if the source is not None
SC_FIELDS_COPY_NOT_NULL = frozenset(('device_collections', 'lpath', 'size', 'comments', 'thumbnail'))
# Metadata fields that smart update should copy without special handling
SC_COPYABLE_FIELDS = SOCIAL_METADATA_FIELDS.union(
PUBLICATION_METADATA_FIELDS).union(
BOOK_STRUCTURE_FIELDS).union(
DEVICE_METADATA_FIELDS).union(
CALIBRE_METADATA_FIELDS) - \
SC_FIELDS_NOT_COPIED.union(
SC_FIELDS_COPY_NOT_NULL)
SERIALIZABLE_FIELDS = SOCIAL_METADATA_FIELDS.union(
USER_METADATA_FIELDS).union(
PUBLICATION_METADATA_FIELDS).union(
CALIBRE_METADATA_FIELDS).union(
DEVICE_METADATA_FIELDS) - \
frozenset(('device_collections', 'formats',
'cover_data'))
# these are rebuilt when needed

View File

@@ -0,0 +1,841 @@
#!/usr/bin/env python2
# vim:fileencoding=UTF-8:ts=4:sw=4:sta:et:sts=4:ai
from __future__ import absolute_import, division, print_function, unicode_literals
__license__ = 'GPL v3'
__copyright__ = '2010, Kovid Goyal <kovid@kovidgoyal.net>'
__docformat__ = 'restructuredtext en'
import copy, traceback
from calibre import prints
from calibre.constants import DEBUG, ispy3
from calibre.ebooks.metadata.book import (SC_COPYABLE_FIELDS,
SC_FIELDS_COPY_NOT_NULL, STANDARD_METADATA_FIELDS,
TOP_LEVEL_IDENTIFIERS, ALL_METADATA_FIELDS)
from calibre.library.field_metadata import FieldMetadata
from calibre.utils.icu import sort_key
from polyglot.builtins import iteritems, unicode_type, filter, map
# Special sets used to optimize the performance of getting and setting
# attributes on Metadata objects
SIMPLE_GET = frozenset(STANDARD_METADATA_FIELDS - TOP_LEVEL_IDENTIFIERS)
SIMPLE_SET = frozenset(SIMPLE_GET - {'identifiers'})
def human_readable(size, precision=2):
""" Convert a size in bytes into megabytes """
return ('%.'+unicode_type(precision)+'f'+ 'MB') % (size/(1024*1024),)
NULL_VALUES = {
'user_metadata': {},
'cover_data' : (None, None),
'tags' : [],
'identifiers' : {},
'languages' : [],
'device_collections': [],
'author_sort_map': {},
'authors' : [_('Unknown')],
'author_sort' : _('Unknown'),
'title' : _('Unknown'),
'user_categories' : {},
'author_link_map' : {},
'language' : 'und'
}
field_metadata = FieldMetadata()
def reset_field_metadata():
global field_metadata
field_metadata = FieldMetadata()
ck = lambda typ: icu_lower(typ).strip().replace(':', '').replace(',', '')
cv = lambda val: val.strip().replace(',', '|')
class Metadata(object):
'''
A class representing all the metadata for a book. The various standard metadata
fields are available as attributes of this object. You can also stick
arbitrary attributes onto this object.
Metadata from custom columns should be accessed via the get() method,
passing in the lookup name for the column, for example: "#mytags".
Use the :meth:`is_null` method to test if a field is null.
This object also has functions to format fields into strings.
The list of standard metadata fields grows with time is in
:data:`STANDARD_METADATA_FIELDS`.
Please keep the method based API of this class to a minimum. Every method
becomes a reserved field name.
'''
__calibre_serializable__ = True
def __init__(self, title, authors=(_('Unknown'),), other=None, template_cache=None,
formatter=None):
'''
@param title: title or ``_('Unknown')``
@param authors: List of strings or []
@param other: None or a metadata object
'''
_data = copy.deepcopy(NULL_VALUES)
_data.pop('language')
object.__setattr__(self, '_data', _data)
if other is not None:
self.smart_update(other)
else:
if title:
self.title = title
if authors:
# List of strings or []
self.author = list(authors) if authors else [] # Needed for backward compatibility
self.authors = list(authors) if authors else []
from calibre.ebooks.metadata.book.formatter import SafeFormat
self.formatter = SafeFormat() if formatter is None else formatter
self.template_cache = template_cache
def is_null(self, field):
'''
Return True if the value of field is null in this object.
'null' means it is unknown or evaluates to False. So a title of
_('Unknown') is null or a language of 'und' is null.
Be careful with numeric fields since this will return True for zero as
well as None.
Also returns True if the field does not exist.
'''
try:
null_val = NULL_VALUES.get(field, None)
val = getattr(self, field, None)
return not val or val == null_val
except:
return True
def set_null(self, field):
null_val = copy.copy(NULL_VALUES.get(field))
setattr(self, field, null_val)
def __getattribute__(self, field):
_data = object.__getattribute__(self, '_data')
if field in SIMPLE_GET:
return _data.get(field, None)
if field in TOP_LEVEL_IDENTIFIERS:
return _data.get('identifiers').get(field, None)
if field == 'language':
try:
return _data.get('languages', [])[0]
except:
return NULL_VALUES['language']
try:
return object.__getattribute__(self, field)
except AttributeError:
pass
if field in _data['user_metadata']:
d = _data['user_metadata'][field]
val = d['#value#']
if d['datatype'] != 'composite':
return val
if val is None:
d['#value#'] = 'RECURSIVE_COMPOSITE FIELD (Metadata) ' + field
val = d['#value#'] = self.formatter.safe_format(
d['display']['composite_template'],
self,
_('TEMPLATE ERROR'),
self, column_name=field,
template_cache=self.template_cache).strip()
return val
if field.startswith('#') and field.endswith('_index'):
try:
return self.get_extra(field[:-6])
except:
pass
raise AttributeError(
'Metadata object has no attribute named: '+ repr(field))
def __setattr__(self, field, val, extra=None):
_data = object.__getattribute__(self, '_data')
if field in SIMPLE_SET:
if val is None:
val = copy.copy(NULL_VALUES.get(field, None))
_data[field] = val
elif field in TOP_LEVEL_IDENTIFIERS:
field, val = self._clean_identifier(field, val)
identifiers = _data['identifiers']
identifiers.pop(field, None)
if val:
identifiers[field] = val
elif field == 'identifiers':
if not val:
val = copy.copy(NULL_VALUES.get('identifiers', None))
self.set_identifiers(val)
elif field == 'language':
langs = []
if val and val.lower() != 'und':
langs = [val]
_data['languages'] = langs
elif field in _data['user_metadata']:
_data['user_metadata'][field]['#value#'] = val
_data['user_metadata'][field]['#extra#'] = extra
else:
# You are allowed to stick arbitrary attributes onto this object as
# long as they don't conflict with global or user metadata names
# Don't abuse this privilege
self.__dict__[field] = val
def __iter__(self):
return iter(object.__getattribute__(self, '_data'))
def has_key(self, key):
return key in object.__getattribute__(self, '_data')
def deepcopy(self, class_generator=lambda : Metadata(None)):
''' Do not use this method unless you know what you are doing, if you
want to create a simple clone of this object, use :meth:`deepcopy_metadata`
instead. Class_generator must be a function that returns an instance
of Metadata or a subclass of it.'''
m = class_generator()
if not isinstance(m, Metadata):
return None
object.__setattr__(m, '__dict__', copy.deepcopy(self.__dict__))
return m
def deepcopy_metadata(self):
m = Metadata(None)
object.__setattr__(m, '_data', copy.deepcopy(object.__getattribute__(self, '_data')))
return m
def get(self, field, default=None):
try:
return self.__getattribute__(field)
except AttributeError:
return default
def get_extra(self, field, default=None):
_data = object.__getattribute__(self, '_data')
if field in _data['user_metadata']:
try:
return _data['user_metadata'][field]['#extra#']
except:
return default
raise AttributeError(
'Metadata object has no attribute named: '+ repr(field))
def set(self, field, val, extra=None):
self.__setattr__(field, val, extra)
def get_identifiers(self):
'''
Return a copy of the identifiers dictionary.
The dict is small, and the penalty for using a reference where a copy is
needed is large. Also, we don't want any manipulations of the returned
dict to show up in the book.
'''
ans = object.__getattribute__(self,
'_data')['identifiers']
if not ans:
ans = {}
return copy.deepcopy(ans)
def _clean_identifier(self, typ, val):
if typ:
typ = ck(typ)
if val:
val = cv(val)
return typ, val
def set_identifiers(self, identifiers):
'''
Set all identifiers. Note that if you previously set ISBN, calling
this method will delete it.
'''
cleaned = {ck(k):cv(v) for k, v in iteritems(identifiers) if k and v}
object.__getattribute__(self, '_data')['identifiers'] = cleaned
def set_identifier(self, typ, val):
'If val is empty, deletes identifier of type typ'
typ, val = self._clean_identifier(typ, val)
if not typ:
return
identifiers = object.__getattribute__(self,
'_data')['identifiers']
identifiers.pop(typ, None)
if val:
identifiers[typ] = val
def has_identifier(self, typ):
identifiers = object.__getattribute__(self,
'_data')['identifiers']
return typ in identifiers
# field-oriented interface. Intended to be the same as in LibraryDatabase
def standard_field_keys(self):
'''
return a list of all possible keys, even if this book doesn't have them
'''
return STANDARD_METADATA_FIELDS
def custom_field_keys(self):
'''
return a list of the custom fields in this book
'''
return iter(object.__getattribute__(self, '_data')['user_metadata'])
def all_field_keys(self):
'''
All field keys known by this instance, even if their value is None
'''
_data = object.__getattribute__(self, '_data')
return frozenset(ALL_METADATA_FIELDS.union(frozenset(_data['user_metadata'])))
def metadata_for_field(self, key):
'''
return metadata describing a standard or custom field.
'''
if key not in self.custom_field_keys():
return self.get_standard_metadata(key, make_copy=False)
return self.get_user_metadata(key, make_copy=False)
def all_non_none_fields(self):
'''
Return a dictionary containing all non-None metadata fields, including
the custom ones.
'''
result = {}
_data = object.__getattribute__(self, '_data')
for attr in STANDARD_METADATA_FIELDS:
v = _data.get(attr, None)
if v is not None:
result[attr] = v
# separate these because it uses the self.get(), not _data.get()
for attr in TOP_LEVEL_IDENTIFIERS:
v = self.get(attr, None)
if v is not None:
result[attr] = v
for attr in _data['user_metadata']:
v = self.get(attr, None)
if v is not None:
result[attr] = v
if _data['user_metadata'][attr]['datatype'] == 'series':
result[attr+'_index'] = _data['user_metadata'][attr]['#extra#']
return result
# End of field-oriented interface
# Extended interfaces. These permit one to get copies of metadata dictionaries, and to
# get and set custom field metadata
def get_standard_metadata(self, field, make_copy):
'''
return field metadata from the field if it is there. Otherwise return
None. field is the key name, not the label. Return a copy if requested,
just in case the user wants to change values in the dict.
'''
if field in field_metadata and field_metadata[field]['kind'] == 'field':
if make_copy:
return copy.deepcopy(field_metadata[field])
return field_metadata[field]
return None
def get_all_standard_metadata(self, make_copy):
'''
return a dict containing all the standard field metadata associated with
the book.
'''
if not make_copy:
return field_metadata
res = {}
for k in field_metadata:
if field_metadata[k]['kind'] == 'field':
res[k] = copy.deepcopy(field_metadata[k])
return res
def get_all_user_metadata(self, make_copy):
'''
return a dict containing all the custom field metadata associated with
the book.
'''
_data = object.__getattribute__(self, '_data')
user_metadata = _data['user_metadata']
if not make_copy:
return user_metadata
res = {}
for k in user_metadata:
res[k] = copy.deepcopy(user_metadata[k])
return res
def get_user_metadata(self, field, make_copy):
'''
return field metadata from the object if it is there. Otherwise return
None. field is the key name, not the label. Return a copy if requested,
just in case the user wants to change values in the dict.
'''
_data = object.__getattribute__(self, '_data')
_data = _data['user_metadata']
if field in _data:
if make_copy:
return copy.deepcopy(_data[field])
return _data[field]
return None
def set_all_user_metadata(self, metadata):
'''
store custom field metadata into the object. Field is the key name
not the label
'''
if metadata is None:
traceback.print_stack()
return
um = {}
for key, meta in iteritems(metadata):
m = meta.copy()
if '#value#' not in m:
if m['datatype'] == 'text' and m['is_multiple']:
m['#value#'] = []
else:
m['#value#'] = None
um[key] = m
_data = object.__getattribute__(self, '_data')
_data['user_metadata'] = um
def set_user_metadata(self, field, metadata):
'''
store custom field metadata for one column into the object. Field is
the key name not the label
'''
if field is not None:
if not field.startswith('#'):
raise AttributeError(
'Custom field name %s must begin with \'#\''%repr(field))
if metadata is None:
traceback.print_stack()
return
m = dict(metadata)
# Copying the elements should not be necessary. The objects referenced
# in the dict should not change. Of course, they can be replaced.
# for k,v in iteritems(metadata):
# m[k] = copy.copy(v)
if '#value#' not in m:
if m['datatype'] == 'text' and m['is_multiple']:
m['#value#'] = []
else:
m['#value#'] = None
_data = object.__getattribute__(self, '_data')
_data['user_metadata'][field] = m
def template_to_attribute(self, other, ops):
'''
Takes a list [(src,dest), (src,dest)], evaluates the template in the
context of other, then copies the result to self[dest]. This is on a
best-efforts basis. Some assignments can make no sense.
'''
if not ops:
return
from calibre.ebooks.metadata.book.formatter import SafeFormat
formatter = SafeFormat()
for op in ops:
try:
src = op[0]
dest = op[1]
val = formatter.safe_format(src, other, 'PLUGBOARD TEMPLATE ERROR', other)
if dest == 'tags':
self.set(dest, [f.strip() for f in val.split(',') if f.strip()])
elif dest == 'authors':
self.set(dest, [f.strip() for f in val.split('&') if f.strip()])
else:
self.set(dest, val)
except:
if DEBUG:
traceback.print_exc()
# Old Metadata API {{{
def print_all_attributes(self):
for x in STANDARD_METADATA_FIELDS:
prints('%s:'%x, getattr(self, x, 'None'))
for x in self.custom_field_keys():
meta = self.get_user_metadata(x, make_copy=False)
if meta is not None:
prints(x, meta)
prints('--------------')
def smart_update(self, other, replace_metadata=False):
'''
Merge the information in `other` into self. In case of conflicts, the information
in `other` takes precedence, unless the information in `other` is NULL.
'''
def copy_not_none(dest, src, attr):
v = getattr(src, attr, None)
if v not in (None, NULL_VALUES.get(attr, None)):
setattr(dest, attr, copy.deepcopy(v))
unknown = _('Unknown')
if other.title and other.title != unknown:
self.title = other.title
if hasattr(other, 'title_sort'):
self.title_sort = other.title_sort
if other.authors and (
other.authors[0] != unknown or (
not self.authors or (
len(self.authors) == 1 and self.authors[0] == unknown and
getattr(self, 'author_sort', None) == unknown
)
)
):
self.authors = list(other.authors)
if hasattr(other, 'author_sort_map'):
self.author_sort_map = dict(other.author_sort_map)
if hasattr(other, 'author_sort'):
self.author_sort = other.author_sort
if replace_metadata:
# SPECIAL_FIELDS = frozenset(['lpath', 'size', 'comments', 'thumbnail'])
for attr in SC_COPYABLE_FIELDS:
setattr(self, attr, getattr(other, attr, 1.0 if
attr == 'series_index' else None))
self.tags = other.tags
self.cover_data = getattr(other, 'cover_data',
NULL_VALUES['cover_data'])
self.set_all_user_metadata(other.get_all_user_metadata(make_copy=True))
for x in SC_FIELDS_COPY_NOT_NULL:
copy_not_none(self, other, x)
if callable(getattr(other, 'get_identifiers', None)):
self.set_identifiers(other.get_identifiers())
# language is handled below
else:
for attr in SC_COPYABLE_FIELDS:
copy_not_none(self, other, attr)
for x in SC_FIELDS_COPY_NOT_NULL:
copy_not_none(self, other, x)
if other.tags:
# Case-insensitive but case preserving merging
lotags = [t.lower() for t in other.tags]
lstags = [t.lower() for t in self.tags]
ot, st = map(frozenset, (lotags, lstags))
for t in st.intersection(ot):
sidx = lstags.index(t)
oidx = lotags.index(t)
self.tags[sidx] = other.tags[oidx]
self.tags += [t for t in other.tags if t.lower() in ot-st]
if getattr(other, 'cover_data', False):
other_cover = other.cover_data[-1]
self_cover = self.cover_data[-1] if self.cover_data else b''
if not self_cover:
self_cover = b''
if not other_cover:
other_cover = b''
if len(other_cover) > len(self_cover):
self.cover_data = other.cover_data
if callable(getattr(other, 'custom_field_keys', None)):
for x in other.custom_field_keys():
meta = other.get_user_metadata(x, make_copy=True)
if meta is not None:
self_tags = self.get(x, [])
self.set_user_metadata(x, meta) # get... did the deepcopy
other_tags = other.get(x, [])
if meta['datatype'] == 'text' and meta['is_multiple']:
# Case-insensitive but case preserving merging
lotags = [t.lower() for t in other_tags]
try:
lstags = [t.lower() for t in self_tags]
except TypeError:
# Happens if x is not a text, is_multiple field
# on self
lstags = []
self_tags = []
ot, st = map(frozenset, (lotags, lstags))
for t in st.intersection(ot):
sidx = lstags.index(t)
oidx = lotags.index(t)
self_tags[sidx] = other_tags[oidx]
self_tags += [t for t in other_tags if t.lower() in ot-st]
setattr(self, x, self_tags)
my_comments = getattr(self, 'comments', '')
other_comments = getattr(other, 'comments', '')
if not my_comments:
my_comments = ''
if not other_comments:
other_comments = ''
if len(other_comments.strip()) > len(my_comments.strip()):
self.comments = other_comments
# Copy all the non-none identifiers
if callable(getattr(other, 'get_identifiers', None)):
d = self.get_identifiers()
s = other.get_identifiers()
d.update([v for v in iteritems(s) if v[1] is not None])
self.set_identifiers(d)
else:
# other structure not Metadata. Copy the top-level identifiers
for attr in TOP_LEVEL_IDENTIFIERS:
copy_not_none(self, other, attr)
other_lang = getattr(other, 'languages', [])
if other_lang and other_lang != ['und']:
self.languages = list(other_lang)
if not getattr(self, 'series', None):
self.series_index = None
def format_series_index(self, val=None):
from calibre.ebooks.metadata import fmt_sidx
v = self.series_index if val is None else val
try:
x = float(v)
except Exception:
x = 1
return fmt_sidx(x)
def authors_from_string(self, raw):
from calibre.ebooks.metadata import string_to_authors
self.authors = string_to_authors(raw)
def format_authors(self):
from calibre.ebooks.metadata import authors_to_string
return authors_to_string(self.authors)
def format_tags(self):
return ', '.join([unicode_type(t) for t in sorted(self.tags, key=sort_key)])
def format_rating(self, v=None, divide_by=1):
if v is None:
if self.rating is not None:
return unicode_type(self.rating/divide_by)
return 'None'
return unicode_type(v/divide_by)
def format_field(self, key, series_with_index=True):
'''
Returns the tuple (display_name, formatted_value)
'''
name, val, ign, ign = self.format_field_extended(key, series_with_index)
return (name, val)
def format_field_extended(self, key, series_with_index=True):
from calibre.ebooks.metadata import authors_to_string
'''
returns the tuple (display_name, formatted_value, original_value,
field_metadata)
'''
from calibre.utils.date import format_date
# Handle custom series index
if key.startswith('#') and key.endswith('_index'):
tkey = key[:-6] # strip the _index
cmeta = self.get_user_metadata(tkey, make_copy=False)
if cmeta and cmeta['datatype'] == 'series':
if self.get(tkey):
res = self.get_extra(tkey)
return (unicode_type(cmeta['name']+'_index'),
self.format_series_index(res), res, cmeta)
else:
return (unicode_type(cmeta['name']+'_index'), '', '', cmeta)
if key in self.custom_field_keys():
res = self.get(key, None) # get evaluates all necessary composites
cmeta = self.get_user_metadata(key, make_copy=False)
name = unicode_type(cmeta['name'])
if res is None or res == '': # can't check "not res" because of numeric fields
return (name, res, None, None)
orig_res = res
datatype = cmeta['datatype']
if datatype == 'text' and cmeta['is_multiple']:
res = cmeta['is_multiple']['list_to_ui'].join(res)
elif datatype == 'series' and series_with_index:
if self.get_extra(key) is not None:
res = res + \
' [%s]'%self.format_series_index(val=self.get_extra(key))
elif datatype == 'datetime':
res = format_date(res, cmeta['display'].get('date_format','dd MMM yyyy'))
elif datatype == 'bool':
res = _('Yes') if res else _('No')
elif datatype == 'rating':
res = '%.2g'%(res/2)
elif datatype in ['int', 'float']:
try:
fmt = cmeta['display'].get('number_format', None)
res = fmt.format(res)
except:
pass
return (name, unicode_type(res), orig_res, cmeta)
# convert top-level ids into their value
if key in TOP_LEVEL_IDENTIFIERS:
fmeta = field_metadata['identifiers']
name = key
res = self.get(key, None)
return (name, res, res, fmeta)
# Translate aliases into the standard field name
fmkey = field_metadata.search_term_to_field_key(key)
if fmkey in field_metadata and field_metadata[fmkey]['kind'] == 'field':
res = self.get(key, None)
fmeta = field_metadata[fmkey]
name = unicode_type(fmeta['name'])
if res is None or res == '':
return (name, res, None, None)
orig_res = res
name = unicode_type(fmeta['name'])
datatype = fmeta['datatype']
if key == 'authors':
res = authors_to_string(res)
elif key == 'series_index':
res = self.format_series_index(res)
elif datatype == 'text' and fmeta['is_multiple']:
if isinstance(res, dict):
res = [k + ':' + v for k,v in res.items()]
res = fmeta['is_multiple']['list_to_ui'].join(sorted(filter(None, res), key=sort_key))
elif datatype == 'series' and series_with_index:
res = res + ' [%s]'%self.format_series_index()
elif datatype == 'datetime':
res = format_date(res, fmeta['display'].get('date_format','dd MMM yyyy'))
elif datatype == 'rating':
res = '%.2g'%(res/2)
elif key == 'size':
res = human_readable(res)
return (name, unicode_type(res), orig_res, fmeta)
return (None, None, None, None)
def __unicode__representation__(self):
'''
A string representation of this object, suitable for printing to
console
'''
from calibre.utils.date import isoformat
from calibre.ebooks.metadata import authors_to_string
ans = []
def fmt(x, y):
ans.append('%-20s: %s'%(unicode_type(x), unicode_type(y)))
fmt('Title', self.title)
if self.title_sort:
fmt('Title sort', self.title_sort)
if self.authors:
fmt('Author(s)', authors_to_string(self.authors) +
((' [' + self.author_sort + ']')
if self.author_sort and self.author_sort != _('Unknown') else ''))
if self.publisher:
fmt('Publisher', self.publisher)
if getattr(self, 'book_producer', False):
fmt('Book Producer', self.book_producer)
if self.tags:
fmt('Tags', ', '.join([unicode_type(t) for t in self.tags]))
if self.series:
fmt('Series', self.series + ' #%s'%self.format_series_index())
if not self.is_null('languages'):
fmt('Languages', ', '.join(self.languages))
if self.rating is not None:
fmt('Rating', ('%.2g'%(float(self.rating)/2)) if self.rating
else '')
if self.timestamp is not None:
fmt('Timestamp', isoformat(self.timestamp))
if self.pubdate is not None:
fmt('Published', isoformat(self.pubdate))
if self.rights is not None:
fmt('Rights', unicode_type(self.rights))
if self.identifiers:
fmt('Identifiers', ', '.join(['%s:%s'%(k, v) for k, v in
iteritems(self.identifiers)]))
if self.comments:
fmt('Comments', self.comments)
for key in self.custom_field_keys():
val = self.get(key, None)
if val:
(name, val) = self.format_field(key)
fmt(name, unicode_type(val))
return '\n'.join(ans)
def to_html(self):
'''
A HTML representation of this object.
'''
from calibre.ebooks.metadata import authors_to_string
from calibre.utils.date import isoformat
ans = [(_('Title'), unicode_type(self.title))]
ans += [(_('Author(s)'), (authors_to_string(self.authors) if self.authors else _('Unknown')))]
ans += [(_('Publisher'), unicode_type(self.publisher))]
ans += [(_('Producer'), unicode_type(self.book_producer))]
ans += [(_('Comments'), unicode_type(self.comments))]
ans += [('ISBN', unicode_type(self.isbn))]
ans += [(_('Tags'), ', '.join([unicode_type(t) for t in self.tags]))]
if self.series:
ans += [(_('Series'), unicode_type(self.series) + ' #%s'%self.format_series_index())]
ans += [(_('Languages'), ', '.join(self.languages))]
if self.timestamp is not None:
ans += [(_('Timestamp'), unicode_type(isoformat(self.timestamp, as_utc=False, sep=' ')))]
if self.pubdate is not None:
ans += [(_('Published'), unicode_type(isoformat(self.pubdate, as_utc=False, sep=' ')))]
if self.rights is not None:
ans += [(_('Rights'), unicode_type(self.rights))]
for key in self.custom_field_keys():
val = self.get(key, None)
if val:
(name, val) = self.format_field(key)
ans += [(name, val)]
for i, x in enumerate(ans):
ans[i] = '<tr><td><b>%s</b></td><td>%s</td></tr>'%x
return '<table>%s</table>'%'\n'.join(ans)
if ispy3:
__str__ = __unicode__representation__
else:
__unicode__ = __unicode__representation__
def __str__(self):
return self.__unicode__().encode('utf-8')
def __nonzero__(self):
return bool(self.title or self.author or self.comments or self.tags)
__bool__ = __nonzero__
# }}}
def field_from_string(field, raw, field_metadata):
''' Parse the string raw to return an object that is suitable for calling
set() on a Metadata object. '''
dt = field_metadata['datatype']
val = object
if dt in {'int', 'float'}:
val = int(raw) if dt == 'int' else float(raw)
elif dt == 'rating':
val = float(raw) * 2
elif dt == 'datetime':
from calibre.utils.date import parse_only_date
val = parse_only_date(raw)
elif dt == 'bool':
if raw.lower() in {'true', 'yes', 'y'}:
val = True
elif raw.lower() in {'false', 'no', 'n'}:
val = False
else:
raise ValueError('Unknown value for %s: %s'%(field, raw))
elif dt == 'text':
ism = field_metadata['is_multiple']
if ism:
val = [x.strip() for x in raw.split(ism['ui_to_list'])]
if field == 'identifiers':
val = {x.partition(':')[0]:x.partition(':')[-1] for x in val}
elif field == 'languages':
from calibre.utils.localization import canonicalize_lang
val = [canonicalize_lang(x) for x in val]
val = [x for x in val if x]
if val is object:
val = raw
return val

View File

@@ -0,0 +1,46 @@
#!/usr/bin/env python2
# vim:fileencoding=utf-8
from __future__ import absolute_import, division, print_function, unicode_literals
__license__ = 'GPL v3'
__copyright__ = '2013, Kovid Goyal <kovid at kovidgoyal.net>'
from calibre.ebooks.metadata.book import TOP_LEVEL_IDENTIFIERS, ALL_METADATA_FIELDS
from calibre.utils.formatter import TemplateFormatter
class SafeFormat(TemplateFormatter):
def __init__(self):
TemplateFormatter.__init__(self)
def get_value(self, orig_key, args, kwargs):
if not orig_key:
return ''
key = orig_key = orig_key.lower()
if (key != 'title_sort' and key not in TOP_LEVEL_IDENTIFIERS and
key not in ALL_METADATA_FIELDS):
from calibre.ebooks.metadata.book.base import field_metadata
key = field_metadata.search_term_to_field_key(key)
if key is None or (self.book and
key not in self.book.all_field_keys()):
if hasattr(self.book, orig_key):
key = orig_key
else:
raise ValueError(_('Value: unknown field ') + orig_key)
try:
b = self.book.get_user_metadata(key, False)
except:
b = None
if b and b['datatype'] in {'int', 'float'} and self.book.get(key, None) is None:
v = ''
else:
v = self.book.format_field(key, series_with_index=False)[1]
if v is None:
return ''
if v == '':
return ''
return v

View File

@@ -0,0 +1,218 @@
from __future__ import absolute_import, division, print_function, unicode_literals
'''
Created on 4 Jun 2010
@author: charles
'''
import json, traceback
from datetime import datetime, time
from calibre.ebooks.metadata.book import SERIALIZABLE_FIELDS
from calibre.constants import filesystem_encoding, preferred_encoding
from calibre.library.field_metadata import FieldMetadata
from calibre import isbytestring
from polyglot.builtins import iteritems, itervalues, as_bytes
from polyglot.binary import as_base64_unicode, from_base64_bytes
# Translate datetimes to and from strings. The string form is the datetime in
# UTC. The returned date is also UTC
def string_to_datetime(src):
from calibre.utils.iso8601 import parse_iso8601
if src != "None":
try:
return parse_iso8601(src)
except Exception:
pass
return None
def datetime_to_string(dateval):
from calibre.utils.date import isoformat, UNDEFINED_DATE, local_tz
if dateval is None:
return "None"
if not isinstance(dateval, datetime):
dateval = datetime.combine(dateval, time())
if hasattr(dateval, 'tzinfo') and dateval.tzinfo is None:
dateval = dateval.replace(tzinfo=local_tz)
if dateval <= UNDEFINED_DATE:
return "None"
return isoformat(dateval)
def encode_thumbnail(thumbnail):
'''
Encode the image part of a thumbnail, then return the 3 part tuple
'''
from calibre.utils.imghdr import identify
if thumbnail is None:
return None
if not isinstance(thumbnail, (tuple, list)):
try:
width, height = identify(as_bytes(thumbnail))[1:]
if width < 0 or height < 0:
return None
thumbnail = (width, height, thumbnail)
except Exception:
return None
return (thumbnail[0], thumbnail[1], as_base64_unicode(thumbnail[2]))
def decode_thumbnail(tup):
'''
Decode an encoded thumbnail into its 3 component parts
'''
if tup is None:
return None
return (tup[0], tup[1], from_base64_bytes(tup[2]))
def object_to_unicode(obj, enc=preferred_encoding):
def dec(x):
return x.decode(enc, 'replace')
if isbytestring(obj):
return dec(obj)
if isinstance(obj, (list, tuple)):
return [dec(x) if isbytestring(x) else object_to_unicode(x) for x in obj]
if isinstance(obj, dict):
ans = {}
for k, v in obj.items():
k = object_to_unicode(k)
v = object_to_unicode(v)
ans[k] = v
return ans
return obj
def encode_is_multiple(fm):
if fm.get('is_multiple', None):
# migrate is_multiple back to a character
fm['is_multiple2'] = fm.get('is_multiple', {})
dt = fm.get('datatype', None)
if dt == 'composite':
fm['is_multiple'] = ','
else:
fm['is_multiple'] = '|'
else:
fm['is_multiple'] = None
fm['is_multiple2'] = {}
def decode_is_multiple(fm):
im = fm.get('is_multiple2', None)
if im:
fm['is_multiple'] = im
del fm['is_multiple2']
else:
# Must migrate the is_multiple from char to dict
im = fm.get('is_multiple', {})
if im:
dt = fm.get('datatype', None)
if dt == 'composite':
im = {'cache_to_list': ',', 'ui_to_list': ',',
'list_to_ui': ', '}
elif fm.get('display', {}).get('is_names', False):
im = {'cache_to_list': '|', 'ui_to_list': '&',
'list_to_ui': ', '}
else:
im = {'cache_to_list': '|', 'ui_to_list': ',',
'list_to_ui': ', '}
elif im is None:
im = {}
fm['is_multiple'] = im
class JsonCodec(object):
def __init__(self, field_metadata=None):
self.field_metadata = field_metadata or FieldMetadata()
def encode_to_file(self, file_, booklist):
data = json.dumps(self.encode_booklist_metadata(booklist), indent=2)
if not isinstance(data, bytes):
data = data.encode('utf-8')
file_.write(data)
def encode_booklist_metadata(self, booklist):
result = []
for book in booklist:
result.append(self.encode_book_metadata(book))
return result
def encode_book_metadata(self, book):
result = {}
for key in SERIALIZABLE_FIELDS:
result[key] = self.encode_metadata_attr(book, key)
return result
def encode_metadata_attr(self, book, key):
if key == 'user_metadata':
meta = book.get_all_user_metadata(make_copy=True)
for fm in itervalues(meta):
if fm['datatype'] == 'datetime':
fm['#value#'] = datetime_to_string(fm['#value#'])
encode_is_multiple(fm)
return meta
if key in self.field_metadata:
datatype = self.field_metadata[key]['datatype']
else:
datatype = None
value = book.get(key)
if key == 'thumbnail':
return encode_thumbnail(value)
elif isbytestring(value): # str includes bytes
enc = filesystem_encoding if key == 'lpath' else preferred_encoding
return object_to_unicode(value, enc=enc)
elif datatype == 'datetime':
return datetime_to_string(value)
else:
return object_to_unicode(value)
def decode_from_file(self, file_, booklist, book_class, prefix):
js = []
try:
js = json.load(file_, encoding='utf-8')
for item in js:
entry = self.raw_to_book(item, book_class, prefix)
if entry is not None:
booklist.append(entry)
except:
print('exception during JSON decode_from_file')
traceback.print_exc()
def raw_to_book(self, json_book, book_class, prefix):
try:
book = book_class(prefix, json_book.get('lpath', None))
for key,val in iteritems(json_book):
meta = self.decode_metadata(key, val)
if key == 'user_metadata':
book.set_all_user_metadata(meta)
else:
if key == 'classifiers':
key = 'identifiers'
setattr(book, key, meta)
return book
except:
print('exception during JSON decoding')
traceback.print_exc()
def decode_metadata(self, key, value):
if key == 'classifiers':
key = 'identifiers'
if key == 'user_metadata':
for fm in itervalues(value):
if fm['datatype'] == 'datetime':
fm['#value#'] = string_to_datetime(fm['#value#'])
decode_is_multiple(fm)
return value
elif key in self.field_metadata:
if self.field_metadata[key]['datatype'] == 'datetime':
return string_to_datetime(value)
if key == 'thumbnail':
return decode_thumbnail(value)
return value

View File

@@ -0,0 +1,412 @@
#!/usr/bin/env python2
# vim:fileencoding=utf-8
from __future__ import absolute_import, division, print_function, unicode_literals
__license__ = 'GPL v3'
__copyright__ = '2013, Kovid Goyal <kovid at kovidgoyal.net>'
'''
Try to read metadata from an HTML file.
'''
import re
import unittest
from collections import defaultdict
from html5_parser import parse
from lxml.etree import Comment
from calibre.ebooks.metadata import string_to_authors, authors_to_string
from calibre.ebooks.metadata.book.base import Metadata
from calibre.ebooks.chardet import xml_to_unicode
from calibre import replace_entities, isbytestring
from calibre.utils.date import parse_date, is_date_undefined
from polyglot.builtins import iteritems
def get_metadata(stream):
src = stream.read()
return get_metadata_(src)
COMMENT_NAMES = {
'title': 'TITLE',
'authors': 'AUTHOR',
'publisher': 'PUBLISHER',
'isbn': 'ISBN',
'languages': 'LANGUAGE',
'pubdate': 'PUBDATE',
'timestamp': 'TIMESTAMP',
'series': 'SERIES',
'series_index': 'SERIESNUMBER',
'rating': 'RATING',
'comments': 'COMMENTS',
'tags': 'TAGS',
}
META_NAMES = {
'title' : ('dc.title', 'dcterms.title', 'title'),
'authors': ('author', 'dc.creator.aut', 'dcterms.creator.aut', 'dc.creator'),
'publisher': ('publisher', 'dc.publisher', 'dcterms.publisher'),
'isbn': ('isbn',),
'languages': ('dc.language', 'dcterms.language'),
'pubdate': ('pubdate', 'date of publication', 'dc.date.published', 'dc.date.publication', 'dc.date.issued', 'dcterms.issued'),
'timestamp': ('timestamp', 'date of creation', 'dc.date.created', 'dc.date.creation', 'dcterms.created'),
'series': ('series',),
'series_index': ('seriesnumber', 'series_index', 'series.index'),
'rating': ('rating',),
'comments': ('comments', 'dc.description'),
'tags': ('tags',),
}
rmap_comment = {v:k for k, v in iteritems(COMMENT_NAMES)}
rmap_meta = {v:k for k, l in iteritems(META_NAMES) for v in l}
# Extract an HTML attribute value, supports both single and double quotes and
# single quotes inside double quotes and vice versa.
attr_pat = r'''(?:(?P<sq>')|(?P<dq>"))(?P<content>(?(sq)[^']+|[^"]+))(?(sq)'|")'''
def handle_comment(data, comment_tags):
if not hasattr(handle_comment, 'pat'):
handle_comment.pat = re.compile(r'''(?P<name>\S+)\s*=\s*%s''' % attr_pat)
for match in handle_comment.pat.finditer(data):
x = match.group('name')
field = None
try:
field = rmap_comment[x]
except KeyError:
pass
if field:
comment_tags[field].append(replace_entities(match.group('content')))
def parse_metadata(src):
root = parse(src)
comment_tags = defaultdict(list)
meta_tags = defaultdict(list)
meta_tag_ids = defaultdict(list)
title = ''
identifier_pat = re.compile(r'(?:dc|dcterms)[.:]identifier(?:\.|$)', flags=re.IGNORECASE)
id_pat2 = re.compile(r'(?:dc|dcterms)[.:]identifier$', flags=re.IGNORECASE)
for comment in root.iterdescendants(tag=Comment):
if comment.text:
handle_comment(comment.text, comment_tags)
for q in root.iterdescendants(tag='title'):
if q.text:
title = q.text
break
for meta in root.iterdescendants(tag='meta'):
name, content = meta.get('name'), meta.get('content')
if not name or not content:
continue
if identifier_pat.match(name) is not None:
scheme = None
if id_pat2.match(name) is not None:
scheme = meta.get('scheme')
else:
elements = re.split(r'[.:]', name)
if len(elements) == 3 and not meta.get('scheme'):
scheme = elements[2].strip()
if scheme:
meta_tag_ids[scheme.lower()].append(content)
else:
x = name.lower()
field = None
try:
field = rmap_meta[x]
except KeyError:
try:
field = rmap_meta[x.replace(':', '.')]
except KeyError:
pass
if field:
meta_tags[field].append(content)
return comment_tags, meta_tags, meta_tag_ids, title
def get_metadata_(src, encoding=None):
# Meta data definitions as in
# https://www.mobileread.com/forums/showpost.php?p=712544&postcount=9
if isbytestring(src):
if not encoding:
src = xml_to_unicode(src)[0]
else:
src = src.decode(encoding, 'replace')
src = src[:150000] # Searching shouldn't take too long
comment_tags, meta_tags, meta_tag_ids, title_tag = parse_metadata(src)
def get_all(field):
ans = comment_tags.get(field, meta_tags.get(field, None))
if ans:
ans = [x.strip() for x in ans if x.strip()]
if not ans:
ans = None
return ans
def get(field):
ans = get_all(field)
if ans:
ans = ans[0]
return ans
# Title
title = get('title') or title_tag.strip() or _('Unknown')
# Author
authors = authors_to_string(get_all('authors')) or _('Unknown')
# Create MetaInformation with Title and Author
mi = Metadata(title, string_to_authors(authors))
# Single-value text fields
for field in ('publisher', 'isbn'):
val = get(field)
if val:
setattr(mi, field, val)
# Multi-value text fields
for field in ('languages',):
val = get_all(field)
if val:
setattr(mi, field, val)
# HTML fields
for field in ('comments',):
val = get(field)
if val:
setattr(mi, field, val.replace('&', '&amp;').replace('<', '&lt;').replace('>', '&gt;').replace('"', '&quot;').replace("'", '&apos;'))
# Date fields
for field in ('pubdate', 'timestamp'):
try:
val = parse_date(get(field))
except:
pass
else:
if not is_date_undefined(val):
setattr(mi, field, val)
# SERIES
series = get('series')
if series:
pat = re.compile(r'\[([.0-9]+)\]$')
match = pat.search(series)
series_index = None
if match is not None:
try:
series_index = float(match.group(1))
except:
pass
series = series.replace(match.group(), '').strip()
mi.series = series
if series_index is None:
series_index = get('series_index')
try:
series_index = float(series_index)
except:
pass
if series_index is not None:
mi.series_index = series_index
# RATING
rating = get('rating')
if rating:
try:
mi.rating = float(rating)
if mi.rating < 0:
mi.rating = 0
if mi.rating > 10:
mi.rating = 0
except:
pass
# TAGS
tags = get_all('tags')
if tags:
tags = [x.strip() for s in tags for x in s.split(',') if x.strip()]
if tags:
mi.tags = tags
# IDENTIFIERS
for (k,v) in iteritems(meta_tag_ids):
v = [x.strip() for x in v if x.strip()]
if v:
mi.set_identifier(k, v[0])
return mi
class MetadataHtmlTest(unittest.TestCase):
def compare_metadata(self, meta_a, meta_b):
for attr in (
'title', 'authors', 'publisher', 'isbn', 'languages', 'pubdate', 'timestamp', 'series',
'series_index', 'rating', 'comments', 'tags', 'identifiers'
):
self.assertEqual(getattr(meta_a, attr), getattr(meta_b, attr))
def get_stream(self, test):
from io import BytesIO
raw = b'''\
<html>
<head>
'''
if test in {'title', 'meta_single', 'meta_multi', 'comment_single', 'comment_multi'}:
raw += b'''\
}
<title>A Title Tag &amp;amp; Title &#x24B8;</title>
'''
if test in {'meta_single', 'meta_multi', 'comment_single', 'comment_multi'}:
raw += b'''\
<meta name="dc:title" content="A Meta Tag &amp;amp; Title &#9400;" />
<meta name="dcterms.creator.aut" content="George Washington" />
<meta name="dc.publisher" content="Publisher A" />
<meta name="isbn" content="1234567890" />
<meta name="dc.language" content="English" />
<meta name="dc.date.published" content="2019-01-01" />
<meta name="dcterms.created" content="2018-01-01" />
<meta name="series" content="Meta Series" />
<meta name="seriesnumber" content="1" />
<meta name="rating" content="" />
<meta name="dc.description" content="" />
<meta name="tags" content="tag a, tag b" />
<meta name="dc.identifier.url" content="" />
<meta name="dc.identifier" scheme="" content="invalid" />
<meta name="dc.identifier." content="still invalid" />
<meta name="dc.identifier.conflicting" scheme="schemes" content="are also invalid" />
<meta name="dc.identifier.custom.subid" content="invalid too" />
'''
if test in {'meta_multi', 'comment_single', 'comment_multi'}:
raw += b'''\
<meta name="title" content="A Different Meta Tag &amp;amp; Title &#9400;" />
<meta name="author" content="John Adams with Thomas Jefferson" />
<meta name="publisher" content="Publisher B" />
<meta name="isbn" content="2345678901" />
<meta name="dcterms.language" content="Spanish" />
<meta name="date of publication" content="2017-01-01" />
<meta name="timestamp" content="2016-01-01" />
<meta name="series" content="Another Meta Series" />
<meta name="series.index" content="2" />
<meta name="rating" content="8" />
<meta name="comments" content="meta &quot;comments&quot; &#x2665; HTML &amp;amp;" />
<meta name="tags" content="tag c" />
<meta name="dc.identifier.url" content="http://google.com/search?q=calibre" />
'''
if test in {'comment_single', 'comment_multi'}:
raw += b'''\
<!-- TITLE="A Comment Tag &amp;amp; Title &#9400;" -->
<!-- AUTHOR="James Madison and James Monroe" -->
<!-- PUBLISHER="Publisher C" -->
<!-- ISBN="3456789012" -->
<!-- LANGUAGE="French" -->
<!-- PUBDATE="2015-01-01" -->
<!-- TIMESTAMP="2014-01-01" -->
<!-- SERIES="Comment Series" -->
<!-- SERIESNUMBER="3" -->
<!-- RATING="20" -->
<!-- COMMENTS="comment &quot;comments&quot; &#x2665; HTML -- too &amp;amp;" -->
<!-- TAGS="tag d" -->
'''
if test in {'comment_multi'}:
raw += b'''\
<!-- TITLE="Another Comment Tag &amp;amp; Title &#9400;" -->
<!-- AUTHOR="John Quincy Adams" -->
<!-- PUBLISHER="Publisher D" -->
<!-- ISBN="4567890123" -->
<!-- LANGUAGE="Japanese" -->
<!-- PUBDATE="2013-01-01" -->
<!-- TIMESTAMP="2012-01-01" -->
<!-- SERIES="Comment Series 2" -->
<!-- SERIESNUMBER="4" -->
<!-- RATING="1" -->
<!-- COMMENTS="comment &quot;comments&quot; &#x2665; HTML -- too &amp;amp; for sure" -->
<!-- TAGS="tag e, tag f" -->
'''
raw += b'''\
</head>
<body>
</body>
</html>
'''
return BytesIO(raw)
def test_input_title(self):
stream_meta = get_metadata(self.get_stream('title'))
canon_meta = Metadata('A Title Tag &amp; Title Ⓒ', [_('Unknown')])
self.compare_metadata(stream_meta, canon_meta)
def test_input_meta_single(self):
stream_meta = get_metadata(self.get_stream('meta_single'))
canon_meta = Metadata('A Meta Tag &amp; Title Ⓒ', ['George Washington'])
canon_meta.publisher = 'Publisher A'
canon_meta.languages = ['English']
canon_meta.pubdate = parse_date('2019-01-01')
canon_meta.timestamp = parse_date('2018-01-01')
canon_meta.series = 'Meta Series'
canon_meta.series_index = float(1)
# canon_meta.rating = float(0)
# canon_meta.comments = ''
canon_meta.tags = ['tag a', 'tag b']
canon_meta.set_identifiers({'isbn': '1234567890'})
self.compare_metadata(stream_meta, canon_meta)
def test_input_meta_multi(self):
stream_meta = get_metadata(self.get_stream('meta_multi'))
canon_meta = Metadata('A Meta Tag &amp; Title Ⓒ', ['George Washington', 'John Adams', 'Thomas Jefferson'])
canon_meta.publisher = 'Publisher A'
canon_meta.languages = ['English', 'Spanish']
canon_meta.pubdate = parse_date('2019-01-01')
canon_meta.timestamp = parse_date('2018-01-01')
canon_meta.series = 'Meta Series'
canon_meta.series_index = float(1)
canon_meta.rating = float(8)
canon_meta.comments = 'meta &quot;comments&quot; ♥ HTML &amp;amp;'
canon_meta.tags = ['tag a', 'tag b', 'tag c']
canon_meta.set_identifiers({'isbn': '1234567890', 'url': 'http://google.com/search?q=calibre'})
self.compare_metadata(stream_meta, canon_meta)
def test_input_comment_single(self):
stream_meta = get_metadata(self.get_stream('comment_single'))
canon_meta = Metadata('A Comment Tag &amp; Title Ⓒ', ['James Madison', 'James Monroe'])
canon_meta.publisher = 'Publisher C'
canon_meta.languages = ['French']
canon_meta.pubdate = parse_date('2015-01-01')
canon_meta.timestamp = parse_date('2014-01-01')
canon_meta.series = 'Comment Series'
canon_meta.series_index = float(3)
canon_meta.rating = float(0)
canon_meta.comments = 'comment &quot;comments&quot; ♥ HTML -- too &amp;amp;'
canon_meta.tags = ['tag d']
canon_meta.set_identifiers({'isbn': '3456789012', 'url': 'http://google.com/search?q=calibre'})
self.compare_metadata(stream_meta, canon_meta)
def test_input_comment_multi(self):
stream_meta = get_metadata(self.get_stream('comment_multi'))
canon_meta = Metadata('A Comment Tag &amp; Title Ⓒ', ['James Madison', 'James Monroe', 'John Quincy Adams'])
canon_meta.publisher = 'Publisher C'
canon_meta.languages = ['French', 'Japanese']
canon_meta.pubdate = parse_date('2015-01-01')
canon_meta.timestamp = parse_date('2014-01-01')
canon_meta.series = 'Comment Series'
canon_meta.series_index = float(3)
canon_meta.rating = float(0)
canon_meta.comments = 'comment &quot;comments&quot; ♥ HTML -- too &amp;amp;'
canon_meta.tags = ['tag d', 'tag e', 'tag f']
canon_meta.set_identifiers({'isbn': '3456789012', 'url': 'http://google.com/search?q=calibre'})
self.compare_metadata(stream_meta, canon_meta)
def find_tests():
return unittest.TestLoader().loadTestsFromTestCase(MetadataHtmlTest)

View File

@@ -0,0 +1,243 @@
from __future__ import absolute_import, division, print_function, unicode_literals
__license__ = 'GPL v3'
__copyright__ = '2008, Kovid Goyal <kovid at kovidgoyal.net>'
import os, re, collections
from calibre.utils.config import prefs
from calibre.constants import filesystem_encoding
from calibre.ebooks.metadata.opf2 import OPF
from calibre import isbytestring
from calibre.customize.ui import get_file_type_metadata, set_file_type_metadata
from calibre.ebooks.metadata import MetaInformation, string_to_authors
from polyglot.builtins import getcwd, unicode_type
# The priorities for loading metadata from different file types
# Higher values should be used to update metadata from lower values
METADATA_PRIORITIES = collections.defaultdict(lambda:0)
for i, ext in enumerate((
'html', 'htm', 'xhtml', 'xhtm',
'rtf', 'fb2', 'pdf', 'prc', 'odt',
'epub', 'lit', 'lrx', 'lrf', 'mobi',
'azw', 'azw3', 'azw1', 'rb', 'imp', 'snb'
)):
METADATA_PRIORITIES[ext] = i + 1
def path_to_ext(path):
return os.path.splitext(path)[1][1:].lower()
def metadata_from_formats(formats, force_read_metadata=False, pattern=None):
try:
return _metadata_from_formats(formats, force_read_metadata, pattern)
except:
mi = metadata_from_filename(list(iter(formats))[0], pat=pattern)
if not mi.authors:
mi.authors = [_('Unknown')]
return mi
def _metadata_from_formats(formats, force_read_metadata=False, pattern=None):
mi = MetaInformation(None, None)
formats.sort(key=lambda x: METADATA_PRIORITIES[path_to_ext(x)])
extensions = list(map(path_to_ext, formats))
if 'opf' in extensions:
opf = formats[extensions.index('opf')]
mi2 = opf_metadata(opf)
if mi2 is not None and mi2.title:
return mi2
for path, ext in zip(formats, extensions):
with lopen(path, 'rb') as stream:
try:
newmi = get_metadata(stream, stream_type=ext,
use_libprs_metadata=True,
force_read_metadata=force_read_metadata,
pattern=pattern)
mi.smart_update(newmi)
except Exception:
continue
if getattr(mi, 'application_id', None) is not None:
return mi
if not mi.title:
mi.title = _('Unknown')
if not mi.authors:
mi.authors = [_('Unknown')]
return mi
def get_metadata(stream, stream_type='lrf', use_libprs_metadata=False,
force_read_metadata=False, pattern=None):
pos = 0
if hasattr(stream, 'tell'):
pos = stream.tell()
try:
return _get_metadata(stream, stream_type, use_libprs_metadata,
force_read_metadata, pattern)
finally:
if hasattr(stream, 'seek'):
stream.seek(pos)
def _get_metadata(stream, stream_type, use_libprs_metadata,
force_read_metadata=False, pattern=None):
if stream_type:
stream_type = stream_type.lower()
if stream_type in ('html', 'html', 'xhtml', 'xhtm', 'xml'):
stream_type = 'html'
if stream_type in ('mobi', 'prc', 'azw'):
stream_type = 'mobi'
if stream_type in ('odt', 'ods', 'odp', 'odg', 'odf'):
stream_type = 'odt'
opf = None
if hasattr(stream, 'name'):
c = os.path.splitext(stream.name)[0]+'.opf'
if os.access(c, os.R_OK):
opf = opf_metadata(os.path.abspath(c))
if use_libprs_metadata and getattr(opf, 'application_id', None) is not None:
return opf
name = os.path.basename(getattr(stream, 'name', ''))
# The fallback pattern matches the default filename format produced by calibre
base = metadata_from_filename(name, pat=pattern, fallback_pat=re.compile(
r'^(?P<title>.+) - (?P<author>[^-]+)$'))
if not base.authors:
base.authors = [_('Unknown')]
if not base.title:
base.title = _('Unknown')
mi = MetaInformation(None, None)
if force_read_metadata or prefs['read_file_metadata']:
mi = get_file_type_metadata(stream, stream_type)
base.smart_update(mi)
if opf is not None:
base.smart_update(opf)
return base
def set_metadata(stream, mi, stream_type='lrf', report_error=None):
if stream_type:
stream_type = stream_type.lower()
set_file_type_metadata(stream, mi, stream_type, report_error=report_error)
def metadata_from_filename(name, pat=None, fallback_pat=None):
if isbytestring(name):
name = name.decode(filesystem_encoding, 'replace')
name = name.rpartition('.')[0]
mi = MetaInformation(None, None)
if pat is None:
pat = re.compile(prefs.get('filename_pattern'))
name = name.replace('_', ' ')
match = pat.search(name)
if match is None and fallback_pat is not None:
match = fallback_pat.search(name)
if match is not None:
try:
mi.title = match.group('title')
except IndexError:
pass
try:
au = match.group('author')
aus = string_to_authors(au)
if aus:
mi.authors = aus
if prefs['swap_author_names'] and mi.authors:
def swap(a):
if ',' in a:
parts = a.split(',', 1)
else:
parts = a.split(None, 1)
if len(parts) > 1:
t = parts[-1]
parts = parts[:-1]
parts.insert(0, t)
return ' '.join(parts)
mi.authors = [swap(x) for x in mi.authors]
except (IndexError, ValueError):
pass
try:
mi.series = match.group('series')
except IndexError:
pass
try:
si = match.group('series_index')
mi.series_index = float(si)
except (IndexError, ValueError, TypeError):
pass
try:
si = match.group('isbn')
mi.isbn = si
except (IndexError, ValueError):
pass
try:
publisher = match.group('publisher')
mi.publisher = publisher
except (IndexError, ValueError):
pass
try:
pubdate = match.group('published')
if pubdate:
from calibre.utils.date import parse_only_date
mi.pubdate = parse_only_date(pubdate)
except:
pass
try:
comments = match.group('comments')
mi.comments = comments
except (IndexError, ValueError):
pass
if mi.is_null('title'):
mi.title = name
return mi
def opf_metadata(opfpath):
if hasattr(opfpath, 'read'):
f = opfpath
opfpath = getattr(f, 'name', getcwd())
else:
f = open(opfpath, 'rb')
try:
opf = OPF(f, os.path.dirname(opfpath))
if opf.application_id is not None:
mi = opf.to_book_metadata()
if hasattr(opf, 'cover') and opf.cover:
cpath = os.path.join(os.path.dirname(opfpath), opf.cover)
if os.access(cpath, os.R_OK):
fmt = cpath.rpartition('.')[-1]
with open(cpath, 'rb') as f:
data = f.read()
mi.cover_data = (fmt, data)
return mi
except Exception:
import traceback
traceback.print_exc()
pass
def forked_read_metadata(path, tdir):
from calibre.ebooks.metadata.opf2 import metadata_to_opf
with lopen(path, 'rb') as f:
fmt = os.path.splitext(path)[1][1:].lower()
f.seek(0, 2)
sz = f.tell()
with lopen(os.path.join(tdir, 'size.txt'), 'wb') as s:
s.write(unicode_type(sz).encode('ascii'))
f.seek(0)
mi = get_metadata(f, fmt)
if mi.cover_data and mi.cover_data[1]:
with lopen(os.path.join(tdir, 'cover.jpg'), 'wb') as f:
f.write(mi.cover_data[1])
mi.cover_data = (None, None)
mi.cover = 'cover.jpg'
opf = metadata_to_opf(mi, default_lang='und')
with lopen(os.path.join(tdir, 'metadata.opf'), 'wb') as f:
f.write(opf)

View File

@@ -0,0 +1,302 @@
#!/usr/bin/python2
# vim:fileencoding=UTF-8:ts=4:sw=4:sta:et:sts=4:fdm=marker:ai
#
# Copyright (C) 2006 Søren Roug, European Environment Agency
#
# This is free software. You may redistribute it under the terms
# of the Apache license and the GNU General Public License Version
# 2 or at your option any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public
# License along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
#
# Contributor(s):
#
from __future__ import absolute_import, division, print_function, unicode_literals
import io
import json
import os
import re
from lxml.etree import fromstring, tostring
from calibre.ebooks.metadata import (
MetaInformation, authors_to_string, check_isbn, string_to_authors
)
from calibre.utils.date import isoformat, parse_date
from calibre.utils.imghdr import identify
from calibre.utils.localization import canonicalize_lang, lang_as_iso639_1
from calibre.utils.zipfile import ZipFile, safe_replace
from odf.draw import Frame as odFrame, Image as odImage
from odf.namespaces import DCNS, METANS, OFFICENS
from odf.opendocument import load as odLoad
from polyglot.builtins import as_unicode
fields = {
'title': (DCNS, 'title'),
'description': (DCNS, 'description'),
'subject': (DCNS, 'subject'),
'creator': (DCNS, 'creator'),
'date': (DCNS, 'date'),
'language': (DCNS, 'language'),
'generator': (METANS, 'generator'),
'initial-creator': (METANS, 'initial-creator'),
'keyword': (METANS, 'keyword'),
'keywords': (METANS, 'keywords'),
'editing-duration': (METANS, 'editing-duration'),
'editing-cycles': (METANS, 'editing-cycles'),
'printed-by': (METANS, 'printed-by'),
'print-date': (METANS, 'print-date'),
'creation-date': (METANS, 'creation-date'),
'user-defined': (METANS, 'user-defined'),
# 'template': (METANS, 'template'),
}
def get_metadata(stream, extract_cover=True):
whitespace = re.compile(r'\s+')
def normalize(s):
return whitespace.sub(' ', s).strip()
with ZipFile(stream) as zf:
meta = zf.read('meta.xml')
root = fromstring(meta)
def find(field):
ns, tag = fields[field]
ans = root.xpath('//ns0:{}'.format(tag), namespaces={'ns0': ns})
if ans:
return normalize(tostring(ans[0], method='text', encoding='unicode', with_tail=False)).strip()
mi = MetaInformation(None, [])
title = find('title')
if title:
mi.title = title
creator = find('initial-creator') or find('creator')
if creator:
mi.authors = string_to_authors(creator)
desc = find('description')
if desc:
mi.comments = desc
lang = find('language')
if lang and canonicalize_lang(lang):
mi.languages = [canonicalize_lang(lang)]
kw = find('keyword') or find('keywords')
if kw:
mi.tags = [x.strip() for x in kw.split(',') if x.strip()]
data = {}
for tag in root.xpath('//ns0:user-defined', namespaces={'ns0': fields['user-defined'][0]}):
name = (tag.get('{%s}name' % METANS) or '').lower()
vtype = tag.get('{%s}value-type' % METANS) or 'string'
val = tag.text
if name and val:
if vtype == 'boolean':
val = val == 'true'
data[name] = val
opfmeta = False # we need this later for the cover
opfnocover = False
if data.get('opf.metadata'):
# custom metadata contains OPF information
opfmeta = True
if data.get('opf.titlesort', ''):
mi.title_sort = data['opf.titlesort']
if data.get('opf.authors', ''):
mi.authors = string_to_authors(data['opf.authors'])
if data.get('opf.authorsort', ''):
mi.author_sort = data['opf.authorsort']
if data.get('opf.isbn', ''):
isbn = check_isbn(data['opf.isbn'])
if isbn is not None:
mi.isbn = isbn
if data.get('opf.publisher', ''):
mi.publisher = data['opf.publisher']
if data.get('opf.pubdate', ''):
mi.pubdate = parse_date(data['opf.pubdate'], assume_utc=True)
if data.get('opf.identifiers'):
try:
mi.identifiers = json.loads(data['opf.identifiers'])
except Exception:
pass
if data.get('opf.rating'):
try:
mi.rating = max(0, min(float(data['opf.rating']), 10))
except Exception:
pass
if data.get('opf.series', ''):
mi.series = data['opf.series']
if data.get('opf.seriesindex', ''):
try:
mi.series_index = float(data['opf.seriesindex'])
except Exception:
mi.series_index = 1.0
if data.get('opf.language', ''):
cl = canonicalize_lang(data['opf.language'])
if cl:
mi.languages = [cl]
opfnocover = data.get('opf.nocover', False)
if not opfnocover:
try:
read_cover(stream, zf, mi, opfmeta, extract_cover)
except Exception:
pass # Do not let an error reading the cover prevent reading other data
return mi
def set_metadata(stream, mi):
with ZipFile(stream) as zf:
raw = _set_metadata(zf.open('meta.xml').read(), mi)
# print(raw.decode('utf-8'))
stream.seek(os.SEEK_SET)
safe_replace(stream, "meta.xml", io.BytesIO(raw))
def _set_metadata(raw, mi):
root = fromstring(raw)
namespaces = {'office': OFFICENS, 'meta': METANS, 'dc': DCNS}
nsrmap = {v: k for k, v in namespaces.items()}
def xpath(expr, parent=root):
return parent.xpath(expr, namespaces=namespaces)
def remove(*tag_names):
for tag_name in tag_names:
ns = fields[tag_name][0]
tag_name = '{}:{}'.format(nsrmap[ns], tag_name)
for x in xpath('descendant::' + tag_name, meta):
x.getparent().remove(x)
def add(tag, val=None):
ans = meta.makeelement('{%s}%s' % fields[tag])
ans.text = val
meta.append(ans)
return ans
def remove_user_metadata(*names):
for x in xpath('//meta:user-defined'):
q = (x.get('{%s}name' % METANS) or '').lower()
if q in names:
x.getparent().remove(x)
def add_um(name, val, vtype='string'):
ans = add('user-defined', val)
ans.set('{%s}value-type' % METANS, vtype)
ans.set('{%s}name' % METANS, name)
def add_user_metadata(name, val):
if not hasattr(add_user_metadata, 'sentinel_added'):
add_user_metadata.sentinel_added = True
remove_user_metadata('opf.metadata')
add_um('opf.metadata', 'true', 'boolean')
val_type = 'string'
if hasattr(val, 'strftime'):
val = isoformat(val, as_utc=True).split('T')[0]
val_type = 'date'
add_um(name, val, val_type)
meta = xpath('//office:meta')[0]
if not mi.is_null('title'):
remove('title')
add('title', mi.title)
if not mi.is_null('title_sort'):
remove_user_metadata('opf.titlesort')
add_user_metadata('opf.titlesort', mi.title_sort)
if not mi.is_null('authors'):
remove('initial-creator', 'creator')
val = authors_to_string(mi.authors)
add('initial-creator', val), add('creator', val)
remove_user_metadata('opf.authors')
add_user_metadata('opf.authors', val)
if not mi.is_null('author_sort'):
remove_user_metadata('opf.authorsort')
add_user_metadata('opf.authorsort', mi.author_sort)
if not mi.is_null('comments'):
remove('description')
add('description', mi.comments)
if not mi.is_null('tags'):
remove('keyword')
add('keyword', ', '.join(mi.tags))
if not mi.is_null('languages'):
lang = lang_as_iso639_1(mi.languages[0])
if lang:
remove('language')
add('language', lang)
if not mi.is_null('pubdate'):
remove_user_metadata('opf.pubdate')
add_user_metadata('opf.pubdate', mi.pubdate)
if not mi.is_null('publisher'):
remove_user_metadata('opf.publisher')
add_user_metadata('opf.publisher', mi.publisher)
if not mi.is_null('series'):
remove_user_metadata('opf.series', 'opf.seriesindex')
add_user_metadata('opf.series', mi.series)
add_user_metadata('opf.seriesindex', '{}'.format(mi.series_index))
if not mi.is_null('identifiers'):
remove_user_metadata('opf.identifiers')
add_user_metadata('opf.identifiers', as_unicode(json.dumps(mi.identifiers)))
if not mi.is_null('rating'):
remove_user_metadata('opf.rating')
add_user_metadata('opf.rating', '%.2g' % mi.rating)
return tostring(root, encoding='utf-8', pretty_print=True)
def read_cover(stream, zin, mi, opfmeta, extract_cover):
# search for an draw:image in a draw:frame with the name 'opf.cover'
# if opf.metadata prop is false, just use the first image that
# has a proper size (borrowed from docx)
otext = odLoad(stream)
cover_href = None
cover_data = None
cover_frame = None
imgnum = 0
for frm in otext.topnode.getElementsByType(odFrame):
img = frm.getElementsByType(odImage)
if len(img) == 0:
continue
i_href = img[0].getAttribute('href')
try:
raw = zin.read(i_href)
except KeyError:
continue
try:
fmt, width, height = identify(raw)
except Exception:
continue
imgnum += 1
if opfmeta and frm.getAttribute('name').lower() == 'opf.cover':
cover_href = i_href
cover_data = (fmt, raw)
cover_frame = frm.getAttribute('name') # could have upper case
break
if cover_href is None and imgnum == 1 and 0.8 <= height/width <= 1.8 and height*width >= 12000:
# Pick the first image as the cover if it is of a suitable size
cover_href = i_href
cover_data = (fmt, raw)
if not opfmeta:
break
if cover_href is not None:
mi.cover = cover_href
mi.odf_cover_frame = cover_frame
if extract_cover:
if not cover_data:
raw = zin.read(cover_href)
try:
fmt = identify(raw)[0]
except Exception:
pass
else:
cover_data = (fmt, raw)
mi.cover_data = cover_data

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,251 @@
#!/usr/bin/env python2
# vim:fileencoding=utf-8
# License: GPLv3 Copyright: 2008, Kovid Goyal <kovid at kovidgoyal.net>
"""
Edit metadata in RTF files.
"""
from __future__ import absolute_import, division, print_function, unicode_literals
import codecs
import re
from calibre import force_unicode
from calibre.ebooks.metadata import MetaInformation
from polyglot.builtins import codepoint_to_chr, string_or_bytes, unicode_type, int_to_byte, filter
title_pat = re.compile(br'\{\\info.*?\{\\title(.*?)(?<!\\)\}', re.DOTALL)
author_pat = re.compile(br'\{\\info.*?\{\\author(.*?)(?<!\\)\}', re.DOTALL)
comment_pat = re.compile(br'\{\\info.*?\{\\subject(.*?)(?<!\\)\}', re.DOTALL)
tags_pat = re.compile(br'\{\\info.*?\{\\category(.*?)(?<!\\)\}', re.DOTALL)
publisher_pat = re.compile(br'\{\\info.*?\{\\manager(.*?)(?<!\\)\}', re.DOTALL)
def get_document_info(stream):
"""
Extract the \\info block from an RTF file.
Return the info block as a string and the position in the file at which it
starts.
@param stream: File like object pointing to the RTF file.
"""
block_size = 4096
stream.seek(0)
found, block = False, b""
while not found:
prefix = block[-6:]
block = prefix + stream.read(block_size)
actual_block_size = len(block) - len(prefix)
if len(block) == len(prefix):
break
idx = block.find(br'{\info')
if idx >= 0:
found = True
pos = stream.tell() - actual_block_size + idx - len(prefix)
stream.seek(pos)
else:
if block.find(br'\sect') > -1:
break
if not found:
return None, 0
data, count, = [], 0
pos = stream.tell()
while True:
ch = stream.read(1)
if ch == b'\\':
data.append(ch + stream.read(1))
continue
if ch == b'{':
count += 1
elif ch == b'}':
count -= 1
data.append(ch)
if count == 0:
break
return b''.join(data), pos
def detect_codepage(stream):
pat = re.compile(br'\\ansicpg(\d+)')
match = pat.search(stream.read(512))
if match is not None:
num = match.group(1)
if num == b'0':
num = b'1252'
try:
codec = (b'cp'+num).decode('ascii')
codecs.lookup(codec)
return codec
except Exception:
pass
def encode(unistr):
if not isinstance(unistr, unicode_type):
unistr = force_unicode(unistr)
return ''.join(c if ord(c) < 128 else '\\u{}?'.format(ord(c)) for c in unistr)
def decode(raw, codec):
# https://en.wikipedia.org/wiki/Rich_Text_Format#Character_encoding
def codepage(match):
try:
return int_to_byte(int(match.group(1), 16)).decode(codec)
except ValueError:
return '?'
def uni(match):
try:
return codepoint_to_chr(int(match.group(1)))
except Exception:
return '?'
if isinstance(raw, bytes):
raw = raw.decode('ascii', 'replace')
if codec is not None:
raw = re.sub(r"\\'([a-fA-F0-9]{2})", codepage, raw)
raw = re.sub(r'\\u([0-9]{3,5}).', uni, raw)
return raw
def get_metadata(stream):
"""
Return metadata as a L{MetaInfo} object
"""
stream.seek(0)
if stream.read(5) != br'{\rtf':
return MetaInformation(_('Unknown'))
block = get_document_info(stream)[0]
if not block:
return MetaInformation(_('Unknown'))
stream.seek(0)
cpg = detect_codepage(stream)
stream.seek(0)
title_match = title_pat.search(block)
if title_match is not None:
title = decode(title_match.group(1).strip(), cpg)
else:
title = _('Unknown')
author_match = author_pat.search(block)
if author_match is not None:
author = decode(author_match.group(1).strip(), cpg)
else:
author = None
mi = MetaInformation(title)
if author:
mi.authors = [x.strip() for x in author.split(',')]
comment_match = comment_pat.search(block)
if comment_match is not None:
comment = decode(comment_match.group(1).strip(), cpg)
mi.comments = comment
tags_match = tags_pat.search(block)
if tags_match is not None:
tags = decode(tags_match.group(1).strip(), cpg)
mi.tags = list(filter(None, (x.strip() for x in tags.split(','))))
publisher_match = publisher_pat.search(block)
if publisher_match is not None:
publisher = decode(publisher_match.group(1).strip(), cpg)
mi.publisher = publisher
return mi
def create_metadata(stream, options):
md = [r'{\info']
if options.title:
title = encode(options.title)
md.append(r'{\title %s}'%(title,))
if options.authors:
au = options.authors
if not isinstance(au, string_or_bytes):
au = ', '.join(au)
author = encode(au)
md.append(r'{\author %s}'%(author,))
comp = options.comment if hasattr(options, 'comment') else options.comments
if comp:
comment = encode(comp)
md.append(r'{\subject %s}'%(comment,))
if options.publisher:
publisher = encode(options.publisher)
md.append(r'{\manager %s}'%(publisher,))
if options.tags:
tags = u', '.join(options.tags)
tags = encode(tags)
md.append(r'{\category %s}'%(tags,))
if len(md) > 1:
md.append('}')
stream.seek(0)
src = stream.read()
ans = src[:6] + ''.join(md).encode('ascii') + src[6:]
stream.seek(0)
stream.write(ans)
def set_metadata(stream, options):
'''
Modify/add RTF metadata in stream
@param options: Object with metadata attributes title, author, comment, category
'''
def add_metadata_item(src, name, val):
index = src.rindex('}')
return src[:index] + r'{\ '[:-1] + name + ' ' + val + '}}'
src, pos = get_document_info(stream)
if src is None:
create_metadata(stream, options)
else:
src = src.decode('ascii')
olen = len(src)
base_pat = r'\{\\name(.*?)(?<!\\)\}'
def replace_or_create(src, name, val):
val = encode(val)
pat = re.compile(base_pat.replace('name', name), re.DOTALL)
src, num = pat.subn('{\\' + name + ' ' + val + '}', src)
if num == 0:
src = add_metadata_item(src, name, val)
return src
if options.title is not None:
src = replace_or_create(src, 'title', options.title)
if options.comments is not None:
src = replace_or_create(src, 'subject', options.comments)
if options.authors is not None:
src = replace_or_create(src, 'author', ', '.join(options.authors))
if options.tags is not None:
src = replace_or_create(src, 'category', ', '.join(options.tags))
if options.publisher is not None:
src = replace_or_create(src, 'manager', options.publisher)
stream.seek(pos + olen)
after = stream.read()
stream.seek(pos)
stream.truncate()
stream.write(src.encode('ascii'))
stream.write(after)
def find_tests():
import unittest
from io import BytesIO
from calibre.ebooks.metadata.book.base import Metadata
class Test(unittest.TestCase):
def test_rtf_metadata(self):
stream = BytesIO(br'{\rtf1\ansi\ansicpg1252}')
m = Metadata('Test ø̄title', ['Author One', 'Author БTwo'])
m.tags = 'tag1 見tag2'.split()
m.comments = '<p>some ⊹comments</p>'
m.publisher = 'publiSher'
set_metadata(stream, m)
stream.seek(0)
o = get_metadata(stream)
for attr in 'title authors publisher comments tags'.split():
self.assertEqual(getattr(m, attr), getattr(o, attr))
return unittest.defaultTestLoader.loadTestsFromTestCase(Test)

View File

@@ -0,0 +1,296 @@
#!/usr/bin/env python2
from __future__ import absolute_import, division, print_function, unicode_literals
__license__ = 'GPL v3'
__copyright__ = '2010, Kovid Goyal <kovid at kovidgoyal.net>'
import os, glob, re, functools
from collections import Counter
from lxml import etree
from lxml.builder import ElementMaker
from calibre.constants import __appname__, __version__
from calibre.ebooks.chardet import xml_to_unicode
from calibre.utils.xml_parse import safe_xml_fromstring
from calibre.utils.cleantext import clean_xml_chars
from polyglot.builtins import unicode_type, getcwd
from polyglot.urllib import unquote, urlparse
NCX_NS = "http://www.daisy.org/z3986/2005/ncx/"
CALIBRE_NS = "http://calibre.kovidgoyal.net/2009/metadata"
NSMAP = {None: NCX_NS, 'calibre':CALIBRE_NS}
E = ElementMaker(namespace=NCX_NS, nsmap=NSMAP)
C = ElementMaker(namespace=CALIBRE_NS, nsmap=NSMAP)
def parse_html_toc(data):
from html5_parser import parse
from calibre.utils.cleantext import clean_xml_chars
from lxml import etree
if isinstance(data, bytes):
data = xml_to_unicode(data, strip_encoding_pats=True, resolve_entities=True)[0]
root = parse(clean_xml_chars(data), maybe_xhtml=True, keep_doctype=False, sanitize_names=True)
for a in root.xpath('//*[@href and local-name()="a"]'):
purl = urlparse(unquote(a.get('href')))
href, fragment = purl[2], purl[5]
if not fragment:
fragment = None
else:
fragment = fragment.strip()
href = href.strip()
txt = etree.tostring(a, method='text', encoding='unicode')
yield href, fragment, txt
class TOC(list):
def __init__(self, href=None, fragment=None, text=None, parent=None,
play_order=0, base_path=getcwd(), type='unknown', author=None,
description=None, toc_thumbnail=None):
self.href = href
self.fragment = fragment
if not self.fragment:
self.fragment = None
self.text = text
self.parent = parent
self.base_path = base_path
self.play_order = play_order
self.type = type
self.author = author
self.description = description
self.toc_thumbnail = toc_thumbnail
def __str__(self):
lines = ['TOC: %s#%s %s'%(self.href, self.fragment, self.text)]
for child in self:
c = unicode_type(child).splitlines()
for l in c:
lines.append('\t'+l)
return '\n'.join(lines)
def count(self, type):
return len([i for i in self.flat() if i.type == type])
def purge(self, types, max=0):
remove = []
for entry in self.flat():
if entry.type in types:
remove.append(entry)
remove = remove[max:]
for entry in remove:
if entry.parent is None:
continue
entry.parent.remove(entry)
return remove
def remove(self, entry):
list.remove(self, entry)
entry.parent = None
def add_item(self, href, fragment, text, play_order=None, type='unknown',
author=None, description=None, toc_thumbnail=None):
if play_order is None:
play_order = (self[-1].play_order if len(self) else self.play_order) + 1
self.append(TOC(href=href, fragment=fragment, text=text, parent=self,
base_path=self.base_path, play_order=play_order,
type=type, author=author, description=description, toc_thumbnail=toc_thumbnail))
return self[-1]
def top_level_items(self):
for item in self:
if item.text is not None:
yield item
def depth(self):
depth = 1
for obj in self:
c = obj.depth()
if c > depth - 1:
depth = c + 1
return depth
def flat(self):
'Depth first iteration over the tree rooted at self'
yield self
for obj in self:
for i in obj.flat():
yield i
@property
def abspath(self):
'Return the file this toc entry points to as a absolute path to a file on the system.'
if self.href is None:
return None
path = self.href.replace('/', os.sep)
if not os.path.isabs(path):
path = os.path.join(self.base_path, path)
return path
def read_from_opf(self, opfreader):
toc = opfreader.soup.find('spine', toc=True)
if toc is not None:
toc = toc['toc']
if toc is None:
try:
toc = opfreader.soup.find('guide').find('reference', attrs={'type':'toc'})['href']
except:
for item in opfreader.manifest:
if 'toc' in item.href().lower():
toc = item.href()
break
if toc is not None:
if toc.lower() not in ('ncx', 'ncxtoc'):
toc = urlparse(unquote(toc))[2]
toc = toc.replace('/', os.sep)
if not os.path.isabs(toc):
toc = os.path.join(self.base_path, toc)
try:
if not os.path.exists(toc):
bn = os.path.basename(toc)
bn = bn.replace('_top.htm', '_toc.htm') # Bug in BAEN OPF files
toc = os.path.join(os.path.dirname(toc), bn)
self.read_html_toc(toc)
except:
print('WARNING: Could not read Table of Contents. Continuing anyway.')
else:
path = opfreader.manifest.item(toc.lower())
path = getattr(path, 'path', path)
if path and os.access(path, os.R_OK):
try:
self.read_ncx_toc(path)
except Exception as err:
print('WARNING: Invalid NCX file:', err)
return
cwd = os.path.abspath(self.base_path)
m = glob.glob(os.path.join(cwd, '*.ncx'))
if m:
toc = m[0]
self.read_ncx_toc(toc)
def read_ncx_toc(self, toc, root=None):
self.base_path = os.path.dirname(toc)
if root is None:
with open(toc, 'rb') as f:
raw = xml_to_unicode(f.read(), assume_utf8=True,
strip_encoding_pats=True)[0]
root = safe_xml_fromstring(raw)
xpn = {'re': 'http://exslt.org/regular-expressions'}
XPath = functools.partial(etree.XPath, namespaces=xpn)
def get_attr(node, default=None, attr='playorder'):
for name, val in node.attrib.items():
if name and val and name.lower().endswith(attr):
return val
return default
nl_path = XPath('./*[re:match(local-name(), "navlabel$", "i")]')
txt_path = XPath('./*[re:match(local-name(), "text$", "i")]')
content_path = XPath('./*[re:match(local-name(), "content$", "i")]')
np_path = XPath('./*[re:match(local-name(), "navpoint$", "i")]')
def process_navpoint(np, dest):
try:
play_order = int(get_attr(np, 1))
except:
play_order = 1
href = fragment = text = None
nd = dest
nl = nl_path(np)
if nl:
nl = nl[0]
text = ''
for txt in txt_path(nl):
text += etree.tostring(txt, method='text',
encoding='unicode', with_tail=False)
content = content_path(np)
if content and text:
content = content[0]
# if get_attr(content, attr='src'):
purl = urlparse(content.get('src'))
href, fragment = unquote(purl[2]), unquote(purl[5])
nd = dest.add_item(href, fragment, text)
nd.play_order = play_order
for c in np_path(np):
process_navpoint(c, nd)
nm = XPath('//*[re:match(local-name(), "navmap$", "i")]')(root)
if not nm:
raise ValueError('NCX files must have a <navmap> element.')
nm = nm[0]
for child in np_path(nm):
process_navpoint(child, self)
def read_html_toc(self, toc):
self.base_path = os.path.dirname(toc)
with lopen(toc, 'rb') as f:
parsed_toc = parse_html_toc(f.read())
for href, fragment, txt in parsed_toc:
add = True
for i in self.flat():
if i.href == href and i.fragment == fragment:
add = False
break
if add:
self.add_item(href, fragment, txt)
def render(self, stream, uid):
root = E.ncx(
E.head(
E.meta(name='dtb:uid', content=unicode_type(uid)),
E.meta(name='dtb:depth', content=unicode_type(self.depth())),
E.meta(name='dtb:generator', content='%s (%s)'%(__appname__,
__version__)),
E.meta(name='dtb:totalPageCount', content='0'),
E.meta(name='dtb:maxPageNumber', content='0'),
),
E.docTitle(E.text('Table of Contents')),
)
navmap = E.navMap()
root.append(navmap)
root.set('{http://www.w3.org/XML/1998/namespace}lang', 'en')
c = Counter()
def navpoint(parent, np):
text = np.text
if not text:
text = ''
c[1] += 1
item_id = 'num_%d'%c[1]
text = clean_xml_chars(text)
elem = E.navPoint(
E.navLabel(E.text(re.sub(r'\s+', ' ', text))),
E.content(src=unicode_type(np.href)+(('#' + unicode_type(np.fragment))
if np.fragment else '')),
id=item_id,
playOrder=unicode_type(np.play_order)
)
au = getattr(np, 'author', None)
if au:
au = re.sub(r'\s+', ' ', au)
elem.append(C.meta(au, name='author'))
desc = getattr(np, 'description', None)
if desc:
desc = re.sub(r'\s+', ' ', desc)
try:
elem.append(C.meta(desc, name='description'))
except ValueError:
elem.append(C.meta(clean_xml_chars(desc), name='description'))
idx = getattr(np, 'toc_thumbnail', None)
if idx:
elem.append(C.meta(idx, name='toc_thumbnail'))
parent.append(elem)
for np2 in np:
navpoint(elem, np2)
for np in self:
navpoint(navmap, np)
raw = etree.tostring(root, encoding='utf-8', xml_declaration=True,
pretty_print=True)
stream.write(raw)

View File

@@ -0,0 +1,104 @@
#!/usr/bin/env python2
# vim:fileencoding=utf-8
# License: GPLv3 Copyright: 2016, Kovid Goyal <kovid at kovidgoyal.net>
from __future__ import absolute_import, division, print_function, unicode_literals
from collections import namedtuple
from calibre.ebooks.chardet import xml_to_unicode
from calibre.ebooks.oeb.base import OPF
from calibre.ebooks.oeb.polish.utils import guess_type
from calibre.spell import parse_lang_code
from calibre.utils.cleantext import clean_xml_chars
from calibre.utils.localization import lang_as_iso639_1
from calibre.utils.xml_parse import safe_xml_fromstring
from polyglot.builtins import filter, map
OPFVersion = namedtuple('OPFVersion', 'major minor patch')
def parse_opf_version(raw):
parts = (raw or '').split('.')
try:
major = int(parts[0])
except Exception:
return OPFVersion(2, 0, 0)
try:
v = list(map(int, raw.split('.')))
except Exception:
v = [major, 0, 0]
while len(v) < 3:
v.append(0)
v = v[:3]
return OPFVersion(*v)
def parse_opf(stream_or_path):
stream = stream_or_path
if not hasattr(stream, 'read'):
stream = open(stream, 'rb')
raw = stream.read()
if not raw:
raise ValueError('Empty file: '+getattr(stream, 'name', 'stream'))
raw, encoding = xml_to_unicode(raw, strip_encoding_pats=True, resolve_entities=True, assume_utf8=True)
raw = raw[raw.find('<'):]
root = safe_xml_fromstring(clean_xml_chars(raw))
if root is None:
raise ValueError('Not an OPF file')
return root
def normalize_languages(opf_languages, mi_languages):
' Preserve original country codes and use 2-letter lang codes where possible '
def parse(x):
try:
return parse_lang_code(x)
except ValueError:
return None
opf_languages = filter(None, map(parse, opf_languages))
cc_map = {c.langcode:c.countrycode for c in opf_languages}
mi_languages = filter(None, map(parse, mi_languages))
def norm(x):
lc = x.langcode
cc = x.countrycode or cc_map.get(lc, None)
lc = lang_as_iso639_1(lc) or lc
if cc:
lc += '-' + cc
return lc
return list(map(norm, mi_languages))
def ensure_unique(template, existing):
b, e = template.rpartition('.')[::2]
if b and e:
e = '.' + e
else:
b, e = template, ''
q = template
c = 0
while q in existing:
c += 1
q = '%s-%d%s' % (b, c, e)
return q
def create_manifest_item(root, href_template, id_template, media_type=None):
all_ids = frozenset(root.xpath('//*/@id'))
all_hrefs = frozenset(root.xpath('//*/@href'))
href = ensure_unique(href_template, all_hrefs)
item_id = ensure_unique(id_template, all_ids)
manifest = root.find(OPF('manifest'))
if manifest is not None:
i = manifest.makeelement(OPF('item'))
i.set('href', href), i.set('id', item_id)
i.set('media-type', media_type or guess_type(href_template))
manifest.append(i)
return i
def pretty_print_opf(root):
from calibre.ebooks.oeb.polish.pretty import pretty_opf, pretty_xml_tree
pretty_opf(root)
pretty_xml_tree(root)