investment/onlinesource.py
Frederik Jaeckel 4a3d996600 optimize code
2024-01-21 19:37:48 +01:00

477 lines
15 KiB
Python

# -*- coding: utf-8 -*-
# This file is part of the investment-module from m-ds.de for Tryton.
# The COPYRIGHT file at the top level of this repository contains the
# full copyright notices and license terms.
from string import Template
import requests
import logging
import html2text
import re
from datetime import datetime
from decimal import Decimal
from trytond.model import ModelView, ModelSQL, fields
from trytond.pool import Pool
from trytond.pyson import Eval, Bool
from trytond.i18n import gettext
from trytond.exceptions import UserError
logger = logging.getLogger(__name__)
sel_rgxdecimal = [
('.', '.'),
(',', ','),
]
sel_rgxidtype = [
('isin', 'ISIN'),
('nsin', 'NSIN'),
('symbol', 'Symbol'),
]
sel_rgxdatefmt = [
('%d.%m.%Y', 'dd.mm.yyyy'),
('%d.%m.%y', 'dd.mm.yy'),
('%m/%d/%Y', 'mm/dd/yyyy'),
('%m/%d/%y', 'mm/dd/yy'),
('%Y-%m-%d', 'yyyy-mm-dd'),
('%b %d %Y', 'mon dd yyyy'),
]
fields_check = [
'url', 'nsin', 'isin', 'symbol', 'text', 'http_state',
'fnddate', 'fndrate', 'fndident']
STATES_WEB = {
'invisible': Eval('query_method', '') != 'web',
'required': Eval('query_method', '') == 'web',
}
DEPENDS_WEB = ['query_method']
class OnlineSource(ModelSQL, ModelView):
'Online Source'
__name__ = 'investment.source'
name = fields.Char(string='Name', required=True)
query_method = fields.Selection(
string='Method', required=True,
help='Select the method to retrieve the data.',
selection='get_query_methods')
url = fields.Char(string='URL', states=STATES_WEB, depends=DEPENDS_WEB)
fixed_url = fields.Boolean(
string='Fixed URL',
states={'invisible': Eval('query_method', '') != 'web'},
depends=DEPENDS_WEB,
help='URL must be defined at investment record.')
nohtml = fields.Boolean(
string='Remove HTML',
help='Removes HTML tags before the text is interpreted.',
states={'invisible': STATES_WEB['invisible']},
depends=DEPENDS_WEB)
rgxdate = fields.Char(
string='Date',
help='Regex code to find the date in the downloaded HTML file.',
states=STATES_WEB, depends=DEPENDS_WEB)
rgxdatefmt = fields.Selection(
string='Date format', selection=sel_rgxdatefmt,
states=STATES_WEB, depends=DEPENDS_WEB)
rgxrate = fields.Char(
string='Rate',
help='Regex code to find the rate in the downloaded HTML file.',
states=STATES_WEB, depends=DEPENDS_WEB)
rgxdecimal = fields.Selection(
string='Decimal Separator',
help='Decimal separator for converting the market ' +
'value into a number.',
selection=sel_rgxdecimal, states=STATES_WEB, depends=DEPENDS_WEB)
rgxident = fields.Char(
string='Identifier',
help='Regex code to find the identifier in the downloaded HTML file.',
states={'invisible': STATES_WEB['invisible']},
depends=DEPENDS_WEB)
rgxidtype = fields.Selection(
string='ID-Type', selection=sel_rgxidtype,
help='Type of identifier used to validate the result.',
states={
'required': Bool(Eval('rgxident', '')),
'invisible': STATES_WEB['invisible'],
}, depends=DEPENDS_WEB+['rgxident'])
# field to test requests
used_url = fields.Function(fields.Char(
string='Used URL', readonly=True,
help='This URL is used to retrieve the HTML file.',
states={'invisible': STATES_WEB['invisible']}, depends=DEPENDS_WEB),
'on_change_with_used_url')
nsin = fields.Function(fields.Char(
string='NSIN'), 'on_change_with_nsin', setter='set_test_value')
isin = fields.Function(fields.Char(
string='ISIN'), 'on_change_with_isin', setter='set_test_value')
symbol = fields.Function(fields.Char(
string='Symbol'), 'on_change_with_symbol', setter='set_test_value')
http_state = fields.Function(fields.Char(
string='HTTP-State',
readonly=True), 'on_change_with_http_state')
text = fields.Function(fields.Text(
string='Result', readonly=True), 'on_change_with_text')
fnddate = fields.Function(fields.Date(
string='Date', readonly=True,
help='Date found during test query.'),
'on_change_with_fnddate')
fndrate = fields.Function(fields.Numeric(
string='Rate', readonly=True,
help='Rate found during test query.', digits=(16, 4)),
'on_change_with_fndrate')
fndident = fields.Function(fields.Char(
string='Identifier', readonly=True,
help='Identifier found during test query.'),
'on_change_with_fndident')
@classmethod
def __setup__(cls):
super(OnlineSource, cls).__setup__()
cls._order.insert(0, ('name', 'DESC'))
@classmethod
def default_query_method(cls):
""" default: web
"""
return 'web'
@classmethod
def default_url(cls):
""" defaul-url
"""
return 'https://'
@classmethod
def default_rgxdate(cls):
""" code to find date: dd.mm.yyyy
"""
return '(\\d{2}\\.\\d{2}\\.\\d{4})'
@classmethod
def default_rgxdatefmt(cls):
""" dd.mm.yyyy
"""
return '%d.%m.%Y'
@classmethod
def default_rgxrate(cls):
""" nn,nn
"""
return '(\\d+,\\d+)'
@classmethod
def default_rgxidtype(cls):
""" isin
"""
return 'isin'
@classmethod
def default_rgxdecimal(cls):
""" comma
"""
return ','
@classmethod
def default_nohtml(cls):
""" default: True
"""
return True
@classmethod
def default_fixed_url(cls):
""" default: False
"""
return False
@fields.depends(*fields_check)
def on_change_nsin(self):
""" run request
"""
self.call_online_source()
@fields.depends(*fields_check)
def on_change_isin(self):
""" run request
"""
self.call_online_source()
@fields.depends(*fields_check)
def on_change_symbol(self):
""" run request
"""
self.call_online_source()
def on_change_with_fnddate(self, name=None):
return None
def on_change_with_fndrate(self, name=None):
return None
def on_change_with_fndident(self, name=None):
return ''
def on_change_with_http_state(self, name=True):
return ''
def on_change_with_text(self, name=None):
return ''
def on_change_with_nsin(self, name=None):
return ''
def on_change_with_isin(self, name=None):
return ''
def on_change_with_symbol(self, name=None):
return ''
@fields.depends('url', 'isin', 'nsin', 'symbol', 'fixed_url')
def on_change_with_used_url(self, name=None):
""" get url for testing
"""
if self.url:
return self.get_url_with_parameter(
isin=self.isin,
nsin=self.nsin,
symbol=self.symbol,
url=self.url)
@classmethod
def get_query_methods(cls):
""" get list of query-methods
"""
return [('web', gettext('investment.msg_querytype_web'))]
@classmethod
def set_test_value(cls, record, name, value):
""" dont store it
"""
pass
@classmethod
def validate(cls, records):
""" check regex-code
"""
for record in records:
for x in ['rgxdate', 'rgxrate', 'rgxident']:
if x:
record.get_regex_result('', x)
@classmethod
def run_query_method(cls, osource, isin, nsin, symbol, url, debug=False):
""" run selected query to retrive data
result: {
'text': raw-text from query - for debug,
'http_state': state of query,
'date': date() if success,
'rate': Decimal() if success,
'code': identifier - isin/nsin/symbol
}
"""
OSourc = Pool().get('investment.source')
if getattr(osource, 'query_method', None) == 'web':
return OSourc.read_from_website(
osource,
isin=isin,
nsin=nsin,
symbol=symbol,
debug=debug,
url=url)
def call_online_source(self):
""" use updated values to call online-source,
for testing parameters
"""
OSourc = Pool().get('investment.source')
result = OSourc.run_query_method(
self, self.isin, self.nsin, self.url,
self.symbol, debug=True)
if result:
self.text = result.get('text', None)
self.http_state = result.get('http_state', None)
self.fnddate = result.get('date', None)
self.fndrate = result.get('rate', None)
self.fndident = result.get('code', None)
def get_url_with_parameter(
self, isin=None, nsin=None, symbol=None, url=None):
""" generate url
"""
if self.fixed_url is True:
if not url:
raise UserError(gettext(
'investment.msg_missing_url',
oname=self.rec_name))
return url
if self.url:
return Template(self.url).substitute({
'isin': isin if isin is not None else '',
'nsin': nsin if nsin is not None else '',
'symbol': symbol if symbol is not None else ''})
@classmethod
def update_rate(cls, asset):
""" read data from inet, write result to rates of asset
"""
pool = Pool()
Rate = pool.get('investment.rate')
IrDate = pool.get('ir.date')
if len(asset.updtsources) == 0:
return
for updtsource in asset.updtsources:
rate_data = cls.run_query_method(
updtsource,
isin=asset.isin,
nsin=asset.wkn,
symbol=asset.secsymb,
url=asset.updturl)
if len(updtsource.rgxident or '') > 0:
# check result - same code?
code = rate_data.get('code', None)
if code:
asset_code = getattr(asset, {
'isin': 'isin',
'nsin': 'wkn',
'symbol': 'secsymb',
}[updtsource.rgxidtype])
if (asset_code or '').lower() != code.lower():
# fail
logger.warning(
'update_rate: got wrong code ' +
'"%(wrong)s" - expected "%(exp)s"' % {
'exp': asset_code,
'wrong': code})
continue
to_create = {
'date': rate_data.get('date', None),
'rate': rate_data.get('rate', None),
'asset': asset.id}
if (to_create['date'] is not None) and \
(to_create['rate'] is not None):
# check if exists
if Rate.search_count([
('asset.id', '=', asset.id),
('date', '=', to_create['date'])]) == 0:
Rate.create([to_create])
return True
else:
# if we got a record for today - stop
# otherwise try next source
if to_create['date'] == IrDate.today():
break
return False
def get_regex_result(self, html_text, field_name):
""" run regex on html-text, convert result
"""
OSource = Pool().get('investment.source')
rgxcode = getattr(self, field_name) or ''
if len(rgxcode) == 0:
return None
try:
search_result = re.compile(rgxcode).search(html_text)
if search_result is None:
return None
except Exception as e1:
raise UserError(gettext(
'investment.msg_bug_in_regexquery',
errmsg=str(e1),
fname=getattr(OSource, field_name).string,
code=rgxcode))
try:
result = search_result.group(1)
except IndexError:
result = search_result.group(0)
if field_name == 'rgxrate':
dec_sep = [',', '.']
dec_sep.remove(self.rgxdecimal)
result = result.replace(
dec_sep[0], '').replace(self.rgxdecimal, '.')
try:
result = Decimal(result)
except Exception:
result = None
elif field_name == 'rgxdate':
try:
result = datetime.strptime(result, self.rgxdatefmt).date()
except Exception:
result = None
return result
@classmethod
def read_from_website(
cls, updtsource, isin=None, nsin=None,
symbol=None, url=None, debug=False):
""" read from url, extract values
"""
result = {}
if updtsource.url == 'https://':
result['text'] = 'invalid url'
return result
res1 = requests.get(
updtsource.get_url_with_parameter(
isin=isin,
nsin=nsin,
symbol=symbol,
url=url),
allow_redirects=True,
timeout=5.0)
result['http_state'] = '%(code)d: %(msg)s' % {
'code': res1.status_code,
'msg': res1.reason,
}
if res1.status_code in [200, 204, 410]:
html = res1.text
# remove html-tags
if updtsource.nohtml:
o1 = html2text.HTML2Text()
o1.ignore_links = True
o1.ignore_tables = True
o1.bypass_tables = False
o1.single_line_break = True
o1.body_width = 0
html = o1.handle(html)
del o1
if debug:
result['text'] = html
result['rate'] = updtsource.get_regex_result(html, 'rgxrate')
result['date'] = updtsource.get_regex_result(html, 'rgxdate')
result['code'] = updtsource.get_regex_result(html, 'rgxident')
else:
logger.error(
'read_from_website: ' +
'%(code)s, url: %(url)s, redirects: [%(redirects)s]' % {
'code': res1.status_code,
'url': res1.url,
'redirects': ', '.join([x.url for x in res1.history]),
})
if debug:
result['text'] = res1.text
return result
# end OnlineSource