Bundling webserver into RTV
This commit is contained in:
67
rtv/oauth.py
67
rtv/oauth.py
@@ -1,4 +1,4 @@
|
||||
import configparser
|
||||
from six.moves import configparser
|
||||
import curses
|
||||
import logging
|
||||
import os
|
||||
@@ -11,11 +11,37 @@ import praw
|
||||
from . import config
|
||||
from .curses_helpers import show_notification, prompt_input
|
||||
|
||||
from tornado import ioloop, web
|
||||
|
||||
__all__ = ['token_validity', 'OAuthTool']
|
||||
_logger = logging.getLogger(__name__)
|
||||
|
||||
token_validity = 3540
|
||||
|
||||
oauth_state = None
|
||||
oauth_code = None
|
||||
oauth_error = None
|
||||
|
||||
class HomeHandler(web.RequestHandler):
|
||||
|
||||
def get(self):
|
||||
self.render('home.html')
|
||||
|
||||
class AuthHandler(web.RequestHandler):
|
||||
|
||||
def get(self):
|
||||
global oauth_state
|
||||
global oauth_code
|
||||
global oauth_error
|
||||
|
||||
oauth_state = self.get_argument('state', default='state_placeholder')
|
||||
oauth_code = self.get_argument('code', default='code_placeholder')
|
||||
oauth_error = self.get_argument('error', default='error_placeholder')
|
||||
|
||||
self.render('auth.html', state=oauth_state, code=oauth_code, error=oauth_error)
|
||||
|
||||
ioloop.IOLoop.current().stop()
|
||||
|
||||
class OAuthTool(object):
|
||||
|
||||
def __init__(self, reddit, stdscr=None, loader=None,
|
||||
@@ -38,6 +64,13 @@ class OAuthTool(object):
|
||||
|
||||
self.token_expiration = 0
|
||||
|
||||
# Initialize Tornado webapp and listen on port 65000
|
||||
self.callback_app = web.Application([
|
||||
(r'/', HomeHandler),
|
||||
(r'/auth', AuthHandler),
|
||||
], template_path='rtv/templates')
|
||||
self.callback_app.listen(65000)
|
||||
|
||||
def get_config_fp(self):
|
||||
HOME = os.path.expanduser('~')
|
||||
XDG_CONFIG_HOME = os.getenv('XDG_CONFIG_HOME',
|
||||
@@ -105,27 +138,37 @@ class OAuthTool(object):
|
||||
permission_ask_page_link = self.reddit.get_authorize_url(str(hex_uuid),
|
||||
scope=self.scope, refreshable=True)
|
||||
|
||||
webbrowser.open(permission_ask_page_link)
|
||||
show_notification(self.stdscr, ['Access prompt opened in web browser'])
|
||||
with self.loader(message='Waiting for authorization'):
|
||||
webbrowser.open(permission_ask_page_link)
|
||||
|
||||
final_state = prompt_input(self.stdscr, 'State: ')
|
||||
final_code = prompt_input(self.stdscr, 'Code: ')
|
||||
ioloop.IOLoop.current().start()
|
||||
|
||||
if not final_state or not final_code:
|
||||
curses.flash()
|
||||
global oauth_state
|
||||
global oauth_code
|
||||
global oauth_error
|
||||
|
||||
self.final_state = oauth_state
|
||||
self.final_code = oauth_code
|
||||
self.final_error = oauth_error
|
||||
|
||||
# Check if access was denied
|
||||
if self.final_error == 'access_denied':
|
||||
show_notification(self.stdscr, ['Declined access'])
|
||||
return
|
||||
elif self.final_error != 'error_placeholder':
|
||||
show_notification(self.stdscr, ['Authentication error'])
|
||||
return
|
||||
|
||||
# Check if UUID matches obtained state
|
||||
# (if not, authorization process is compromised, and I'm giving up)
|
||||
if hex_uuid != final_state:
|
||||
elif hex_uuid != self.final_state:
|
||||
show_notification(self.stdscr, ['UUID mismatch, stopping.'])
|
||||
return
|
||||
|
||||
# Get access information (tokens and scopes)
|
||||
self.access_info = self.reddit.get_access_information(final_code)
|
||||
|
||||
try:
|
||||
with self.loader(message='Logging in'):
|
||||
# Get access information (tokens and scopes)
|
||||
self.access_info = self.reddit.get_access_information(self.final_code)
|
||||
|
||||
self.reddit.set_access_credentials(
|
||||
scope=set(self.access_info['scope']),
|
||||
access_token=self.access_info['access_token'],
|
||||
|
||||
Reference in New Issue
Block a user