Tweaking a few things oauth things.
This commit is contained in:
235
rtv/oauth.py
235
rtv/oauth.py
@@ -1,185 +1,120 @@
|
||||
import curses
|
||||
import logging
|
||||
import os
|
||||
import time
|
||||
import uuid
|
||||
import webbrowser
|
||||
|
||||
import praw
|
||||
from six.moves import configparser
|
||||
|
||||
from . import config
|
||||
from .curses_helpers import show_notification, prompt_input
|
||||
|
||||
from tornado import gen, ioloop, web, httpserver
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
|
||||
__all__ = ['token_validity', 'OAuthTool']
|
||||
_logger = logging.getLogger(__name__)
|
||||
from . import config
|
||||
from .curses_helpers import show_notification, prompt_input
|
||||
from .helpers import check_browser_display, open_browser
|
||||
|
||||
__all__ = ['OAuthTool']
|
||||
|
||||
oauth_state = None
|
||||
oauth_code = None
|
||||
oauth_error = None
|
||||
|
||||
class HomeHandler(web.RequestHandler):
|
||||
|
||||
def get(self):
|
||||
self.render('home.html')
|
||||
|
||||
class AuthHandler(web.RequestHandler):
|
||||
|
||||
def initialize(self):
|
||||
self.compact = os.environ.get('BROWSER') in ['w3m', 'links', 'elinks', 'lynx']
|
||||
|
||||
def get(self):
|
||||
global oauth_state
|
||||
global oauth_code
|
||||
global oauth_error
|
||||
global oauth_state, oauth_code, oauth_error
|
||||
|
||||
oauth_state = self.get_argument('state', default='state_placeholder')
|
||||
oauth_code = self.get_argument('code', default='code_placeholder')
|
||||
oauth_error = self.get_argument('error', default='error_placeholder')
|
||||
oauth_state = self.get_argument('state', default='placeholder')
|
||||
oauth_code = self.get_argument('code', default='placeholder')
|
||||
oauth_error = self.get_argument('error', default='placeholder')
|
||||
|
||||
self.render('auth.html', state=oauth_state, code=oauth_code, error=oauth_error)
|
||||
self.render('index.html', state=oauth_state, code=oauth_code, error=oauth_error)
|
||||
|
||||
# Stop IOLoop if using BackgroundBrowser (or GUI browser)
|
||||
if not self.compact:
|
||||
# Stop IOLoop if using a background browser such as firefox
|
||||
if check_browser_display():
|
||||
ioloop.IOLoop.current().stop()
|
||||
|
||||
class OAuthTool(object):
|
||||
|
||||
def __init__(self, reddit, stdscr=None, loader=None,
|
||||
client_id=None, redirect_uri=None, scope=None):
|
||||
def __init__(self, reddit, stdscr=None, loader=None):
|
||||
|
||||
self.reddit = reddit
|
||||
self.stdscr = stdscr
|
||||
self.loader = loader
|
||||
|
||||
self.config = configparser.ConfigParser()
|
||||
self.config_fp = None
|
||||
|
||||
self.client_id = client_id or config.oauth_client_id
|
||||
# Comply with PRAW's desperate need for client secret
|
||||
self.client_secret = config.oauth_client_secret
|
||||
self.redirect_uri = redirect_uri or config.oauth_redirect_uri
|
||||
|
||||
self.scope = scope or config.oauth_scope.split('-')
|
||||
self.access_info = {}
|
||||
|
||||
# Terminal web browser
|
||||
self.compact = os.environ.get('BROWSER') in ['w3m', 'links', 'elinks', 'lynx']
|
||||
|
||||
# Initialize Tornado webapp
|
||||
self.callback_app = web.Application([
|
||||
(r'/', HomeHandler),
|
||||
(r'/auth', AuthHandler),
|
||||
], template_path='rtv/templates')
|
||||
|
||||
self.http_server = None
|
||||
|
||||
def get_config_fp(self):
|
||||
HOME = os.path.expanduser('~')
|
||||
XDG_CONFIG_HOME = os.getenv('XDG_CONFIG_HOME',
|
||||
os.path.join(HOME, '.config'))
|
||||
self.refresh_token = config.load_refresh_token()
|
||||
|
||||
if os.path.exists(os.path.join(XDG_CONFIG_HOME, 'rtv')):
|
||||
file_path = os.path.join(XDG_CONFIG_HOME, 'rtv', 'oauth.cfg')
|
||||
else:
|
||||
file_path = os.path.join(HOME, '.rtv-oauth')
|
||||
# Initialize Tornado webapp
|
||||
routes = [('/', AuthHandler)]
|
||||
self.callback_app = web.Application(routes, template_path='templates')
|
||||
|
||||
return file_path
|
||||
self.reddit.set_oauth_app_info(config.oauth_client_id,
|
||||
config.oauth_client_secret,
|
||||
config.oauth_redirect_uri)
|
||||
|
||||
def open_config(self, update=False):
|
||||
if self.config_fp is None:
|
||||
self.config_fp = self.get_config_fp()
|
||||
|
||||
if update:
|
||||
self.config.read(self.config_fp)
|
||||
|
||||
def save_config(self):
|
||||
self.open_config()
|
||||
with open(self.config_fp, 'w') as cfg:
|
||||
self.config.write(cfg)
|
||||
|
||||
def clear_oauth_data(self):
|
||||
self.open_config(update=True)
|
||||
if self.config.has_section('oauth') and self.config.has_option('oauth', 'refresh_token'):
|
||||
self.config.remove_option('oauth', 'refresh_token')
|
||||
self.save_config()
|
||||
|
||||
@gen.coroutine
|
||||
def open_terminal_browser(self, url):
|
||||
with ThreadPoolExecutor(max_workers=1) as executor:
|
||||
yield executor.submit(webbrowser.open_new_tab, url)
|
||||
|
||||
ioloop.IOLoop.current().stop()
|
||||
# Reddit's mobile website works better on terminal browsers
|
||||
if not check_browser_display():
|
||||
if '.compact' not in self.reddit.config.API_PATHS['authorize']:
|
||||
self.reddit.config.API_PATHS['authorize'] += '.compact'
|
||||
|
||||
def authorize(self):
|
||||
if self.compact and not '.compact' in self.reddit.config.API_PATHS['authorize']:
|
||||
self.reddit.config.API_PATHS['authorize'] += '.compact'
|
||||
|
||||
self.reddit.set_oauth_app_info(self.client_id,
|
||||
self.client_secret,
|
||||
self.redirect_uri)
|
||||
|
||||
self.open_config(update=True)
|
||||
# If no previous OAuth data found, starting from scratch
|
||||
if not self.config.has_section('oauth') or not self.config.has_option('oauth', 'refresh_token'):
|
||||
if self.http_server is None:
|
||||
self.http_server = httpserver.HTTPServer(self.callback_app)
|
||||
self.http_server.listen(65000)
|
||||
|
||||
# Generate a random UUID
|
||||
hex_uuid = uuid.uuid4().hex
|
||||
|
||||
permission_ask_page_link = self.reddit.get_authorize_url(str(hex_uuid),
|
||||
scope=self.scope, refreshable=True)
|
||||
|
||||
if self.compact:
|
||||
show_notification(self.stdscr, ['Opening ' + os.environ.get('BROWSER')])
|
||||
curses.endwin()
|
||||
ioloop.IOLoop.current().add_callback(self.open_terminal_browser, permission_ask_page_link)
|
||||
ioloop.IOLoop.current().start()
|
||||
curses.doupdate()
|
||||
else:
|
||||
with self.loader(message='Waiting for authorization'):
|
||||
webbrowser.open(permission_ask_page_link)
|
||||
ioloop.IOLoop.current().start()
|
||||
|
||||
global oauth_state
|
||||
global oauth_code
|
||||
global oauth_error
|
||||
|
||||
self.final_state = oauth_state
|
||||
self.final_code = oauth_code
|
||||
self.final_error = oauth_error
|
||||
|
||||
# Check if access was denied
|
||||
if self.final_error == 'access_denied':
|
||||
show_notification(self.stdscr, ['Declined access'])
|
||||
return
|
||||
elif self.final_error != 'error_placeholder':
|
||||
show_notification(self.stdscr, ['Authentication error'])
|
||||
return
|
||||
|
||||
# Check if UUID matches obtained state
|
||||
# (if not, authorization process is compromised, and I'm giving up)
|
||||
if hex_uuid != self.final_state:
|
||||
show_notification(self.stdscr, ['UUID mismatch, stopping.'])
|
||||
return
|
||||
|
||||
try:
|
||||
with self.loader(message='Logging in'):
|
||||
# Get access information (tokens and scopes)
|
||||
self.access_info = self.reddit.get_access_information(self.final_code)
|
||||
except (praw.errors.OAuthAppRequired, praw.errors.OAuthInvalidToken) as e:
|
||||
show_notification(self.stdscr, ['Invalid OAuth data'])
|
||||
else:
|
||||
if not self.config.has_section('oauth'):
|
||||
self.config.add_section('oauth')
|
||||
|
||||
self.config.set('oauth', 'refresh_token', self.access_info['refresh_token'])
|
||||
self.save_config()
|
||||
# Otherwise, fetch new access token
|
||||
else:
|
||||
# If we already have a token, request new access credentials
|
||||
if self.refresh_token:
|
||||
with self.loader(message='Logging in'):
|
||||
self.reddit.refresh_access_information(self.config.get('oauth', 'refresh_token'))
|
||||
self.reddit.refresh_access_information(self.refresh_token)
|
||||
return
|
||||
|
||||
# Start the authorization callback server
|
||||
if self.http_server is None:
|
||||
self.http_server = httpserver.HTTPServer(self.callback_app)
|
||||
self.http_server.listen(config.oauth_redirect_port)
|
||||
|
||||
hex_uuid = uuid.uuid4().hex
|
||||
authorize_url = self.reddit.get_authorize_url(
|
||||
hex_uuid, scope=config.oauth_scope, refreshable=True)
|
||||
|
||||
# Open the browser and wait for the user to authorize the app
|
||||
if check_browser_display():
|
||||
with self.loader(message='Waiting for authorization'):
|
||||
open_browser(authorize_url)
|
||||
ioloop.IOLoop.current().start()
|
||||
else:
|
||||
with self.loader(delay=0, message='Redirecting to reddit'):
|
||||
# Provide user feedback
|
||||
time.sleep(1)
|
||||
ioloop.IOLoop.current().add_callback(self._open_authorize_url,
|
||||
authorize_url)
|
||||
ioloop.IOLoop.current().start()
|
||||
|
||||
if oauth_error == 'access_denied':
|
||||
show_notification(self.stdscr, ['Declined access'])
|
||||
return
|
||||
elif oauth_error != 'placeholder':
|
||||
show_notification(self.stdscr, ['Authentication error'])
|
||||
return
|
||||
elif hex_uuid != oauth_state:
|
||||
# Check if UUID matches obtained state.
|
||||
# If not, authorization process is compromised.
|
||||
show_notification(self.stdscr, ['UUID mismatch'])
|
||||
return
|
||||
|
||||
try:
|
||||
with self.loader(message='Logging in'):
|
||||
access_info = self.reddit.get_access_information(oauth_code)
|
||||
config.save_refresh_token(access_info['refresh_token'])
|
||||
self.refresh_token = access_info['refresh_token']
|
||||
except (praw.errors.OAuthAppRequired, praw.errors.OAuthInvalidToken):
|
||||
show_notification(self.stdscr, ['Invalid OAuth data'])
|
||||
else:
|
||||
message = ['Welcome {}!'.format(self.reddit.user.name)]
|
||||
show_notification(self.stdscr, message)
|
||||
|
||||
def clear_oauth_data(self):
|
||||
self.reddit.clear_authentication()
|
||||
config.clear_refresh_token()
|
||||
self.refresh_token = None
|
||||
|
||||
@gen.coroutine
|
||||
def _open_authorize_url(self, url):
|
||||
with ThreadPoolExecutor(max_workers=1) as executor:
|
||||
yield executor.submit(open_browser, url)
|
||||
ioloop.IOLoop.current().stop()
|
||||
|
||||
Reference in New Issue
Block a user