1
0
mirror of https://github.com/gryf/ebook-converter.git synced 2026-03-18 15:53:41 +01:00

Removed Windows bits.

This commit is contained in:
2020-06-17 19:23:52 +02:00
parent cd93b85eb0
commit 1a09e3db84
34 changed files with 147 additions and 820 deletions

View File

@@ -88,9 +88,7 @@ class OptionParser(optparse.OptionParser):
fg='cyan')
usage += ('\n\nWhenever you pass arguments to %prog that have spaces '
'in them, enclose the arguments in quotation marks. For '
'example: "{}"\n\n').format("C:\\some path with spaces"
if constants_old.iswindows
else '/some path/with spaces')
'example: "{}"\n\n').format('/some path/with spaces')
if version is None:
version = '%%prog (%s %s)' % (constants_old.__appname__,
constants.VERSION)

View File

@@ -14,7 +14,6 @@ import base64
from ebook_converter.constants_old import CONFIG_DIR_MODE
from ebook_converter.constants_old import config_dir
from ebook_converter.constants_old import filesystem_encoding
from ebook_converter.constants_old import iswindows
from ebook_converter.constants_old import preferred_encoding
from ebook_converter.utils.date import isoformat
from ebook_converter.utils import iso8601
@@ -79,7 +78,7 @@ def from_json(obj):
def force_unicode(x):
try:
return x.decode('mbcs' if iswindows else preferred_encoding)
return x.decode(preferred_encoding)
except UnicodeDecodeError:
try:
return x.decode(filesystem_encoding)

View File

@@ -3,7 +3,7 @@ import datetime
import time
import functools
from ebook_converter.constants_old import iswindows, isosx, plugins, preferred_encoding
from ebook_converter.constants_old import isosx, plugins, preferred_encoding
from ebook_converter.utils.iso8601 import utc_tz, local_tz, UNDEFINED_DATE
from ebook_converter.utils.localization import lcdata
@@ -13,38 +13,21 @@ _local_tz = local_tz
# When parsing ambiguous dates that could be either dd-MM Or MM-dd use the
# user's locale preferences
if iswindows:
import ctypes
LOCALE_SSHORTDATE, LOCALE_USER_DEFAULT = 0x1f, 0
buf = ctypes.create_string_buffer(b'\0', 255)
try:
ctypes.windll.kernel32.GetLocaleInfoA(LOCALE_USER_DEFAULT, LOCALE_SSHORTDATE, buf, 255)
parse_date_day_first = buf.value.index(b'd') < buf.value.index(b'M')
except:
parse_date_day_first = False
del ctypes, LOCALE_SSHORTDATE, buf, LOCALE_USER_DEFAULT
elif isosx:
try:
date_fmt = plugins['usbobserver'][0].date_format()
parse_date_day_first = date_fmt.index('d') < date_fmt.index('M')
except:
parse_date_day_first = False
else:
try:
def first_index(raw, queries):
for q in queries:
try:
return raw.index(q)
except ValueError:
pass
return -1
try:
def first_index(raw, queries):
for q in queries:
try:
return raw.index(q)
except ValueError:
pass
return -1
import locale
raw = locale.nl_langinfo(locale.D_FMT)
parse_date_day_first = first_index(raw, ('%d', '%a', '%A')) < first_index(raw, ('%m', '%b', '%B'))
del raw, first_index
except:
parse_date_day_first = False
import locale
raw = locale.nl_langinfo(locale.D_FMT)
parse_date_day_first = first_index(raw, ('%d', '%a', '%A')) < first_index(raw, ('%m', '%b', '%B'))
del raw, first_index
except:
parse_date_day_first = False
DEFAULT_DATE = datetime.datetime(2000,1,1, tzinfo=utc_tz)
EPOCH = datetime.datetime(1970, 1, 1, tzinfo=_utc_tz)
@@ -214,15 +197,10 @@ def strftime(fmt, t=None):
t[0] = replacement
t = time.struct_time(t)
ans = None
if iswindows:
if isinstance(fmt, bytes):
fmt = fmt.decode('mbcs', 'replace')
fmt = fmt.replace('%e', '%#d')
ans = plugins['winutil'][0].strftime(fmt, t)
else:
ans = time.strftime(fmt, t)
if isinstance(ans, bytes):
ans = ans.decode(preferred_encoding, 'replace')
ans = time.strftime(fmt, t)
if isinstance(ans, bytes):
ans = ans.decode(preferred_encoding, 'replace')
if early_year:
ans = ans.replace('_early year hack##', str(orig_year))
return ans

View File

@@ -10,7 +10,7 @@ from math import ceil
from ebook_converter import force_unicode, prints, sanitize_file_name
from ebook_converter.constants_old import (
filesystem_encoding, iswindows, plugins, preferred_encoding, isosx
filesystem_encoding, plugins, preferred_encoding, isosx
)
from ebook_converter.utils.localization import get_udc
@@ -47,9 +47,9 @@ def shorten_component(s, by_what):
def limit_component(x, limit=254):
# windows and macs use ytf-16 codepoints for length, linux uses arbitrary
# windows and macs use utf-16 codepoints for length, linux uses arbitrary
# binary data, but we will assume utf-8
filename_encoding_for_length = 'utf-16' if iswindows or isosx else 'utf-8'
filename_encoding_for_length = 'utf-8'
def encoded_length():
q = x if isinstance(x, bytes) else x.encode(filename_encoding_for_length)
@@ -100,7 +100,7 @@ def shorten_components_to(length, components, more_to_take=0, last_has_extension
def find_executable_in_path(name, path=None):
if path is None:
path = os.environ.get('PATH', '')
exts = '.exe .cmd .bat'.split() if iswindows and not name.endswith('.exe') else ('',)
exts = ('',)
path = path.split(os.pathsep)
for x in path:
for ext in exts:
@@ -118,15 +118,14 @@ def is_case_sensitive(path):
apply to the filesystem containing the directory in path.
'''
is_case_sensitive = False
if not iswindows:
name1, name2 = ('calibre_test_case_sensitivity.txt',
'calibre_TesT_CaSe_sensitiVitY.Txt')
f1, f2 = os.path.join(path, name1), os.path.join(path, name2)
if os.path.exists(f1):
os.remove(f1)
open(f1, 'w').close()
is_case_sensitive = not os.path.exists(f2)
name1, name2 = ('calibre_test_case_sensitivity.txt',
'calibre_TesT_CaSe_sensitiVitY.Txt')
f1, f2 = os.path.join(path, name1), os.path.join(path, name2)
if os.path.exists(f1):
os.remove(f1)
open(f1, 'w').close()
is_case_sensitive = not os.path.exists(f2)
os.remove(f1)
return is_case_sensitive
@@ -162,10 +161,6 @@ def case_preserving_open_file(path, mode='wb', mkdir_mode=0o777):
raise ValueError('Invalid path: %r'%path)
cpath = sep
if iswindows:
# Always upper case the drive letter and add a trailing slash so that
# the first os.listdir works correctly
cpath = components[0].upper() + sep
bdir = path if mode is None else os.path.dirname(path)
if not os.path.exists(bdir):
@@ -215,37 +210,6 @@ def case_preserving_open_file(path, mode='wb', mkdir_mode=0o777):
return ans, fpath
def windows_get_fileid(path):
''' The fileid uniquely identifies actual file contents (it is the same for
all hardlinks to a file). Similar to inode number on linux. '''
import win32file
from pywintypes import error
if isinstance(path, bytes):
path = path.decode(filesystem_encoding)
try:
h = win32file.CreateFileW(path, 0, 0, None, win32file.OPEN_EXISTING,
win32file.FILE_FLAG_BACKUP_SEMANTICS, 0)
try:
data = win32file.GetFileInformationByHandle(h)
finally:
win32file.CloseHandle(h)
except (error, EnvironmentError):
return None
return data[4], data[8], data[9]
def samefile_windows(src, dst):
samestring = (os.path.normcase(os.path.abspath(src)) ==
os.path.normcase(os.path.abspath(dst)))
if samestring:
return True
a, b = windows_get_fileid(src), windows_get_fileid(dst)
if a is None and b is None:
return False
return a == b
def samefile(src, dst):
'''
Check if two paths point to the same actual file on the filesystem. Handles
@@ -257,9 +221,6 @@ def samefile(src, dst):
case) even if the file does not exist. This is because I have no way of
knowing how reliable the GetFileInformationByHandle method is.
'''
if iswindows:
return samefile_windows(src, dst)
if hasattr(os.path, 'samefile'):
# Unix
try:
@@ -273,243 +234,20 @@ def samefile(src, dst):
return samestring
def windows_get_size(path):
''' On windows file sizes are only accurately stored in the actual file,
not in the directory entry (which could be out of date). So we open the
file, and get the actual size. '''
import win32file
if isinstance(path, bytes):
path = path.decode(filesystem_encoding)
h = win32file.CreateFileW(
path, 0, win32file.FILE_SHARE_READ | win32file.FILE_SHARE_WRITE | win32file.FILE_SHARE_DELETE,
None, win32file.OPEN_EXISTING, 0, None)
try:
return win32file.GetFileSize(h)
finally:
win32file.CloseHandle(h)
def windows_hardlink(src, dest):
import win32file, pywintypes
try:
win32file.CreateHardLink(dest, src)
except pywintypes.error as e:
msg = 'Creating hardlink from %s to %s failed: %%s' % (src, dest)
raise OSError(msg % e)
src_size = os.path.getsize(src)
# We open and close dest, to ensure its directory entry is updated
# see http://blogs.msdn.com/b/oldnewthing/archive/2011/12/26/10251026.aspx
for i in range(10):
# If we are on a network filesystem, we have to wait for some indeterminate time, since
# network file systems are the best thing since sliced bread
try:
if windows_get_size(dest) == src_size:
return
except EnvironmentError:
pass
time.sleep(0.3)
sz = windows_get_size(dest)
if sz != src_size:
msg = 'Creating hardlink from %s to %s failed: %%s' % (src, dest)
raise OSError(msg % ('hardlink size: %d not the same as source size' % sz))
def windows_fast_hardlink(src, dest):
import win32file, pywintypes
try:
win32file.CreateHardLink(dest, src)
except pywintypes.error as e:
msg = 'Creating hardlink from %s to %s failed: %%s' % (src, dest)
raise OSError(msg % e)
ssz, dsz = windows_get_size(src), windows_get_size(dest)
if ssz != dsz:
msg = 'Creating hardlink from %s to %s failed: %%s' % (src, dest)
raise OSError(msg % ('hardlink size: %d not the same as source size: %s' % (dsz, ssz)))
def windows_nlinks(path):
import win32file
dwFlagsAndAttributes = win32file.FILE_FLAG_BACKUP_SEMANTICS if os.path.isdir(path) else 0
if isinstance(path, bytes):
path = path.decode(filesystem_encoding)
handle = win32file.CreateFileW(path, win32file.GENERIC_READ, win32file.FILE_SHARE_READ, None, win32file.OPEN_EXISTING, dwFlagsAndAttributes, None)
try:
return win32file.GetFileInformationByHandle(handle)[7]
finally:
handle.Close()
class WindowsAtomicFolderMove(object):
'''
Move all the files inside a specified folder in an atomic fashion,
preventing any other process from locking a file while the operation is
incomplete. Raises an IOError if another process has locked a file before
the operation starts. Note that this only operates on the files in the
folder, not any sub-folders.
'''
def __init__(self, path):
self.handle_map = {}
import win32file, winerror
from pywintypes import error
from collections import defaultdict
if isinstance(path, bytes):
path = path.decode(filesystem_encoding)
if not os.path.exists(path):
return
names = os.listdir(path)
name_to_fileid = {x:windows_get_fileid(os.path.join(path, x)) for x in names}
fileid_to_names = defaultdict(set)
for name, fileid in name_to_fileid.items():
fileid_to_names[fileid].add(name)
for x in names:
f = os.path.normcase(os.path.abspath(os.path.join(path, x)))
if not os.path.isfile(f):
continue
try:
# Ensure the file is not read-only
win32file.SetFileAttributes(f, win32file.FILE_ATTRIBUTE_NORMAL)
except:
pass
try:
h = win32file.CreateFileW(f, win32file.GENERIC_READ,
win32file.FILE_SHARE_DELETE, None,
win32file.OPEN_EXISTING, win32file.FILE_FLAG_SEQUENTIAL_SCAN, 0)
except error as e:
if getattr(e, 'winerror', 0) == winerror.ERROR_SHARING_VIOLATION:
# The file could be a hardlink to an already opened file,
# in which case we use the same handle for both files
fileid = name_to_fileid[x]
found = False
if fileid is not None:
for other in fileid_to_names[fileid]:
other = os.path.normcase(os.path.abspath(os.path.join(path, other)))
if other in self.handle_map:
self.handle_map[f] = self.handle_map[other]
found = True
break
if found:
continue
self.close_handles()
if getattr(e, 'winerror', 0) == winerror.ERROR_SHARING_VIOLATION:
err = IOError(errno.EACCES,
'File is open in another process')
err.filename = f
raise err
prints('CreateFile failed for: %r' % f)
raise
except:
self.close_handles()
prints('CreateFile failed for: %r' % f)
raise
self.handle_map[f] = h
def copy_path_to(self, path, dest):
import win32file
handle = None
for p, h in self.handle_map.items():
if samefile_windows(path, p):
handle = h
break
if handle is None:
if os.path.exists(path):
raise ValueError('The file %r did not exist when this move'
' operation was started'%path)
else:
raise ValueError('The file %r does not exist'%path)
try:
windows_hardlink(path, dest)
return
except:
pass
win32file.SetFilePointer(handle, 0, win32file.FILE_BEGIN)
with open(dest, 'wb') as f:
while True:
hr, raw = win32file.ReadFile(handle, 1024*1024)
if hr != 0:
raise IOError(hr, 'Error while reading from %r'%path)
if not raw:
break
f.write(raw)
def release_file(self, path):
' Release the lock on the file pointed to by path. Will also release the lock on any hardlinks to path '
key = None
for p, h in self.handle_map.items():
if samefile_windows(path, p):
key = (p, h)
break
if key is not None:
import win32file
win32file.CloseHandle(key[1])
remove = [f for f, h in self.handle_map.items() if h is key[1]]
for x in remove:
self.handle_map.pop(x)
def close_handles(self):
import win32file
for h in self.handle_map.values():
win32file.CloseHandle(h)
self.handle_map = {}
def delete_originals(self):
import win32file
for path in self.handle_map:
win32file.DeleteFile(path)
self.close_handles()
def hardlink_file(src, dest):
if iswindows:
windows_hardlink(src, dest)
return
os.link(src, dest)
def nlinks_file(path):
' Return number of hardlinks to the file '
if iswindows:
return windows_nlinks(path)
return os.stat(path).st_nlink
if iswindows:
def rename_file(a, b):
move_file = plugins['winutil'][0].move_file
if isinstance(a, bytes):
a = a.decode('mbcs')
if isinstance(b, bytes):
b = b.decode('mbcs')
move_file(a, b)
def atomic_rename(oldpath, newpath):
'''Replace the file newpath with the file oldpath. Can fail if the files
are on different volumes. If succeeds, guaranteed to be atomic. newpath may
or may not exist. If it exists, it is replaced. '''
if iswindows:
for i in range(10):
try:
rename_file(oldpath, newpath)
break
except Exception:
if i > 8:
raise
# Try the rename repeatedly in case something like a virus
# scanner has opened one of the files (I love windows)
time.sleep(1)
else:
os.rename(oldpath, newpath)
os.rename(oldpath, newpath)
def remove_dir_if_empty(path, ignore_metadata_caches=False):
@@ -572,20 +310,7 @@ def copyfile(src, dest):
def get_hardlink_function(src, dest):
if iswindows:
import win32file, win32api
colon = b':' if isinstance(dest, bytes) else ':'
root = dest[0] + colon
try:
is_suitable = win32file.GetDriveType(root) not in (win32file.DRIVE_REMOTE, win32file.DRIVE_CDROM)
# See https://msdn.microsoft.com/en-us/library/windows/desktop/aa364993(v=vs.85).aspx
supports_hard_links = win32api.GetVolumeInformation(root + os.sep)[3] & 0x00400000
except Exception:
supports_hard_links = is_suitable = False
hardlink = windows_fast_hardlink if is_suitable and supports_hard_links and src[0].lower() == dest[0].lower() else None
else:
hardlink = os.link
return hardlink
return os.link
def copyfile_using_links(path, dest, dest_is_dir=True, filecopyfunc=copyfile):

View File

@@ -3,7 +3,7 @@ from collections import defaultdict
from threading import Thread
from ebook_converter import walk, prints
from ebook_converter.constants_old import iswindows, isosx
from ebook_converter.constants_old import isosx
from ebook_converter.constants_old import plugins, DEBUG
from ebook_converter.constants_old import filesystem_encoding
from ebook_converter.utils.fonts.metadata import FontMetadata, UnsupportedFont
@@ -90,14 +90,6 @@ def fc_list():
def font_dirs():
if iswindows:
winutil, err = plugins['winutil']
if err:
raise RuntimeError('Failed to load winutil: %s' % err)
try:
return [winutil.special_folder_path(winutil.CSIDL_FONTS)]
except ValueError:
return [r'C:\Windows\Fonts']
if isosx:
return [
'/Library/Fonts',

View File

@@ -6,10 +6,6 @@ from ebook_converter.utils.fonts.sfnt import UnknownTable, max_power_of_two
from ebook_converter.utils.fonts.sfnt.errors import UnsupportedFont
__license__ = 'GPL v3'
__copyright__ = '2012, Kovid Goyal <kovid at kovidgoyal.net>'
__docformat__ = 'restructuredtext en'
# Note that the code for creating a BMP table (cmap format 4) is taken with
# thanks from the fonttools project (BSD licensed).

View File

@@ -12,7 +12,7 @@ from threading import Thread
#from PyQt5.QtGui import QColor, QImage, QImageReader, QImageWriter, QPixmap, QTransform
from ebook_converter import fit_image, force_unicode
from ebook_converter.constants_old import iswindows, plugins
from ebook_converter.constants_old import plugins
from ebook_converter.ptempfile import TemporaryDirectory
from ebook_converter.utils.config_base import tweaks
from ebook_converter.utils.filenames import atomic_rename
@@ -38,8 +38,6 @@ def normalize_format_name(fmt):
def get_exe_path(name):
from ebook_converter.ebooks.pdf.pdftohtml import PDFTOHTML
base = os.path.dirname(PDFTOHTML)
if iswindows:
name += '-calibre.exe'
if not base:
return name
return os.path.join(base, name)
@@ -47,12 +45,10 @@ def get_exe_path(name):
def load_jxr_data(data):
with TemporaryDirectory() as tdir:
if iswindows and isinstance(tdir, str):
tdir = tdir.encode('mbcs')
with open(os.path.join(tdir, 'input.jxr'), 'wb') as f:
f.write(data)
cmd = [get_exe_path('JxrDecApp'), '-i', 'input.jxr', '-o', 'output.tif']
creationflags = 0x08 if iswindows else 0
creationflags = 0
subprocess.Popen(cmd, cwd=tdir, stdout=open(os.devnull, 'wb'), stderr=subprocess.STDOUT, creationflags=creationflags).wait()
i = QImage()
if not i.load(os.path.join(tdir, 'output.tif')):
@@ -548,7 +544,7 @@ def run_optimizer(file_path, cmd, as_filter=False, input_data=None):
stdin = subprocess.PIPE if as_filter else None
stderr = subprocess.PIPE if as_filter else subprocess.STDOUT
creationflags = 0x08 if iswindows else 0
creationflags = 0
p = subprocess.Popen(cmd, cwd=cwd, stdout=subprocess.PIPE, stderr=stderr, stdin=stdin, creationflags=creationflags)
stderr = p.stderr if as_filter else p.stdout
if as_filter:
@@ -652,7 +648,7 @@ def test(): # {{{
despeckle_image(img)
remove_borders_from_image(img)
image_to_data(img, fmt='GIF')
raw = subprocess.Popen([get_exe_path('JxrDecApp'), '-h'], creationflags=0x08 if iswindows else 0, stdout=subprocess.PIPE).stdout.read()
raw = subprocess.Popen([get_exe_path('JxrDecApp'), '-h'], creationflags=0, stdout=subprocess.PIPE).stdout.read()
if b'JPEG XR Decoder Utility' not in raw:
raise SystemExit('Failed to run JxrDecApp')
# }}}

View File

@@ -6,16 +6,10 @@ import threading
from ebook_converter import force_unicode
from ebook_converter.constants_old import filesystem_encoding
from ebook_converter.constants_old import get_windows_username
from ebook_converter.constants_old import islinux
from ebook_converter.constants_old import iswindows
from ebook_converter.utils.filenames import ascii_filename
__license__ = 'GPL v3'
__copyright__ = '2009, Kovid Goyal <kovid@kovidgoyal.net>'
__docformat__ = 'restructuredtext en'
VADDRESS = None
@@ -31,34 +25,24 @@ def eintr_retry_call(func, *args, **kwargs):
@functools.lru_cache()
def socket_address(which):
if iswindows:
ans = r'\\.\pipe\Calibre' + which
try:
user = get_windows_username()
except Exception:
user = None
if user:
user = ascii_filename(user).replace(' ', '_')
if user:
ans += '-' + user[:100] + 'x'
user = force_unicode(os.environ.get('USER') or os.path.basename(os.path.expanduser('~')), filesystem_encoding)
sock_name = '{}-calibre-{}.socket'.format(ascii_filename(user).replace(' ', '_'), which)
if islinux:
ans = '\0' + sock_name
else:
user = force_unicode(os.environ.get('USER') or os.path.basename(os.path.expanduser('~')), filesystem_encoding)
sock_name = '{}-calibre-{}.socket'.format(ascii_filename(user).replace(' ', '_'), which)
if islinux:
ans = '\0' + sock_name
else:
from tempfile import gettempdir
tmp = force_unicode(gettempdir(), filesystem_encoding)
ans = os.path.join(tmp, sock_name)
from tempfile import gettempdir
tmp = force_unicode(gettempdir(), filesystem_encoding)
ans = os.path.join(tmp, sock_name)
return ans
def gui_socket_address():
return socket_address('GUI' if iswindows else 'gui')
return socket_address('gui')
def viewer_socket_address():
return socket_address('Viewer' if iswindows else 'viewer')
return socket_address('viewer')
class RC(threading.Thread):

View File

@@ -10,10 +10,6 @@ from collections import namedtuple, OrderedDict
from tempfile import SpooledTemporaryFile
__license__ = 'GPL v3'
__copyright__ = '2012, Kovid Goyal <kovid at kovidgoyal.net>'
__docformat__ = 'restructuredtext en'
HEADER_SIG = 0x04034b50
HEADER_BYTE_SIG = pack(b'<L', HEADER_SIG)
local_header_fmt = b'<L5HL2L2H'
@@ -27,18 +23,8 @@ LocalHeader = namedtuple('LocalHeader',
'filename extra')
if hasattr(sys, 'getwindowsversion'):
windows_reserved_filenames = (
'CON', 'PRN', 'AUX', 'CLOCK$', 'NUL' 'COM0', 'COM1', 'COM2', 'COM3',
'COM4', 'COM5', 'COM6', 'COM7', 'COM8', 'COM9' 'LPT0', 'LPT1', 'LPT2',
'LPT3', 'LPT4', 'LPT5', 'LPT6', 'LPT7', 'LPT8', 'LPT9')
def is_reserved_filename(x):
base = x.partition('.')[0].upper()
return base in windows_reserved_filenames
else:
def is_reserved_filename(x):
return False
def is_reserved_filename(x):
return False
def decode_arcname(name):

View File

@@ -36,11 +36,7 @@ def find_tests():
eq(fname, f.name)
f = share_open(fname, 'rb')
eq(f.read(1), b'a')
if iswindows:
os.rename(fname, fname+'.moved')
os.remove(fname+'.moved')
else:
os.remove(fname)
os.remove(fname)
eq(f.read(1), b'a')
f2 = share_open(fname, 'w+b')
f2.write(b'b' * 10 * 1024)

View File

@@ -1,27 +1,11 @@
import os, sys, re
try:
import ctypes.wintypes
iswindows = True
except ValueError:
iswindows = False
import fcntl, termios, struct
def fmt(code):
return '\033[%dm' % code
if iswindows:
class CONSOLE_SCREEN_BUFFER_INFO(ctypes.Structure):
_fields_ = [
('dwSize', ctypes.wintypes._COORD),
('dwCursorPosition', ctypes.wintypes._COORD),
('wAttributes', ctypes.wintypes.WORD),
('srWindow', ctypes.wintypes._SMALL_RECT),
('dwMaximumWindowSize', ctypes.wintypes._COORD)
]
RATTRIBUTES = dict(
zip(range(1, 9), (
'bold',
@@ -65,21 +49,6 @@ COLORS = {v:fmt(k) for k, v in RCOLORS.items()}
RESET = fmt(0)
if iswindows:
# From wincon.h
WCOLORS = {c:i for i, c in enumerate((
'black', 'blue', 'green', 'cyan', 'red', 'magenta', 'yellow', 'white'))}
def to_flag(fg, bg, bold):
val = 0
if bold:
val |= 0x08
if fg in WCOLORS:
val |= WCOLORS[fg]
if bg in WCOLORS:
val |= (WCOLORS[bg] << 4)
return val
def colored(text, fg=None, bg=None, bold=False):
prefix = []
@@ -100,76 +69,14 @@ def colored(text, fg=None, bg=None, bold=False):
class Detect(object):
def __init__(self, stream):
__import__('pdb').set_trace()
self.stream = stream or sys.stdout
self.isatty = getattr(self.stream, 'isatty', lambda : False)()
force_ansi = False
if not self.isatty and force_ansi:
self.isatty = True
self.isansi = force_ansi or not iswindows
self.isansi = force_ansi or not False
self.set_console = self.write_console = None
self.is_console = False
if not self.isansi:
try:
import msvcrt
self.msvcrt = msvcrt
self.file_handle = msvcrt.get_osfhandle(self.stream.fileno())
from ctypes import windll, wintypes, byref, POINTER, WinDLL
mode = wintypes.DWORD(0)
f = windll.kernel32.GetConsoleMode
f.argtypes, f.restype = [wintypes.HANDLE, POINTER(wintypes.DWORD)], wintypes.BOOL
if f(self.file_handle, byref(mode)):
# Stream is a console
self.set_console = windll.kernel32.SetConsoleTextAttribute
self.default_console_text_attributes = WCOLORS['white']
kernel32 = WinDLL('kernel32', use_last_error=True)
self.write_console = kernel32.WriteConsoleW
self.write_console.argtypes = [wintypes.HANDLE, wintypes.c_wchar_p, wintypes.DWORD, POINTER(wintypes.DWORD), wintypes.LPVOID]
self.write_console.restype = wintypes.BOOL
kernel32.GetConsoleScreenBufferInfo.argtypes = [wintypes.HANDLE, ctypes.POINTER(CONSOLE_SCREEN_BUFFER_INFO)]
kernel32.GetConsoleScreenBufferInfo.restype = wintypes.BOOL
csbi = CONSOLE_SCREEN_BUFFER_INFO()
if kernel32.GetConsoleScreenBufferInfo(self.file_handle, byref(csbi)):
self.default_console_text_attributes = csbi.wAttributes
self.is_console = True
except:
pass
def write_unicode_text(self, text, ignore_errors=False):
' Windows only method that writes unicode strings correctly to the windows console using the Win32 API '
if self.is_console:
from ctypes import wintypes, byref, c_wchar_p
written = wintypes.DWORD(0)
text = text.replace('\0', '')
chunk = len(text)
while text:
t, text = text[:chunk], text[chunk:]
wt = c_wchar_p(t)
text_len = len(t.encode('utf-16'))
if not self.write_console(self.file_handle, wt, text_len, byref(written), None):
# Older versions of windows can fail to write large strings
# to console with WriteConsoleW (seen it happen on Win XP)
import winerror
err = ctypes.get_last_error()
if err == winerror.ERROR_NOT_ENOUGH_MEMORY and chunk >= 128:
# Retry with a smaller chunk size (give up if chunk < 128)
chunk = chunk // 2
text = t + text
continue
if err == winerror.ERROR_GEN_FAILURE:
# On newer windows, this happens when trying to write
# non-ascii chars to the console and the console is set
# to use raster fonts (the default). In this case
# rather than failing, write an informative error
# message and the asciized version of the text.
print('Non-ASCII text detected. You must set your Console\'s font to'
' Lucida Console or Consolas or some other TrueType font to see this text', file=self.stream, end=' -- ')
from ebook_converter.utils.filenames import ascii_text
print(ascii_text(t + text), file=self.stream, end='')
continue
if not ignore_errors:
raise ctypes.WinError(err)
class ColoredStream(Detect):
@@ -178,10 +85,6 @@ class ColoredStream(Detect):
stream = getattr(stream, 'buffer', stream)
Detect.__init__(self, stream)
self.fg, self.bg, self.bold = fg, bg, bold
if self.set_console is not None:
self.wval = to_flag(self.fg, self.bg, bold)
if not self.bg:
self.wval |= self.default_console_text_attributes & 0xF0
def cwrite(self, what):
if not isinstance(what, bytes):
@@ -286,13 +189,6 @@ class ANSIStream(Detect):
def write_plain_text(self, text, start, end):
if start < end:
text = text[start:end]
if self.is_console and isinstance(text, bytes):
try:
utext = text.decode(self.encoding)
except ValueError:
pass
else:
return self.write_unicode_text(utext)
self.polyglot_write(text)
def convert_ansi(self, paramstring, command):
@@ -326,60 +222,11 @@ class ANSIStream(Detect):
fg, bg, bold = None, None, False
self.last_state = (fg, bg, bold)
if fg or bg or bold:
val = to_flag(fg, bg, bold)
if not bg:
val |= self.default_console_text_attributes & 0xF0
self.set_console(self.file_handle, val)
else:
self.set_console(self.file_handle, self.default_console_text_attributes)
def windows_terminfo():
from ctypes import Structure, byref
from ctypes.wintypes import SHORT, WORD
class COORD(Structure):
"""struct in wincon.h"""
_fields_ = [
('X', SHORT),
('Y', SHORT),
]
class SMALL_RECT(Structure):
"""struct in wincon.h."""
_fields_ = [
("Left", SHORT),
("Top", SHORT),
("Right", SHORT),
("Bottom", SHORT),
]
class CONSOLE_SCREEN_BUFFER_INFO(Structure):
"""struct in wincon.h."""
_fields_ = [
("dwSize", COORD),
("dwCursorPosition", COORD),
("wAttributes", WORD),
("srWindow", SMALL_RECT),
("dwMaximumWindowSize", COORD),
]
csbi = CONSOLE_SCREEN_BUFFER_INFO()
import msvcrt
file_handle = msvcrt.get_osfhandle(sys.stdout.fileno())
from ctypes import windll
success = windll.kernel32.GetConsoleScreenBufferInfo(file_handle,
byref(csbi))
if not success:
raise Exception('stdout is not a console?')
return csbi
self.set_console(self.file_handle, self.default_console_text_attributes)
def get_term_geometry():
import fcntl, termios, struct
def ioctl_GWINSZ(fd):
try:
@@ -405,30 +252,10 @@ def get_term_geometry():
def geometry():
if iswindows:
try:
ti = windows_terminfo()
return (ti.dwSize.X or 80, ti.dwSize.Y or 25)
except:
return 80, 25
else:
try:
lines, cols = get_term_geometry()
if lines is not None:
return cols, lines
except Exception:
pass
return 80, 25
def test():
s = ANSIStream()
text = [colored(t, fg=t)+'. '+colored(t, fg=t, bold=True)+'.' for t in
('red', 'yellow', 'green', 'white', 'cyan', 'magenta', 'blue',)]
s.write('\n'.join(text))
u = u'\u041c\u0438\u0445\u0430\u0438\u043b fällen'
print()
s.write_unicode_text(u)
print()
try:
lines, cols = get_term_geometry()
if lines is not None:
return cols, lines
except Exception:
pass
return 80, 25

View File

@@ -340,11 +340,7 @@ class ZipInfo (object):
self.compress_type = ZIP_STORED # Type of compression for the file
self.comment = b"" # Comment for each file
self.extra = b"" # ZIP extra data
if sys.platform == 'win32':
self.create_system = 0 # System which created ZIP archive
else:
# Assume everything else is unix-y
self.create_system = 3 # System which created ZIP archive
self.create_system = 3 # System which created ZIP archive
self.create_version = 20 # Version which created ZIP archive
self.extract_version = 20 # Version needed to extract archive
self.reserved = 0 # Must be zero