investment/onlinesource.py

446 lines
14 KiB
Python
Raw Normal View History

# -*- coding: utf-8 -*-
# This file is part of the investment-module from m-ds.de for Tryton.
# The COPYRIGHT file at the top level of this repository contains the
# full copyright notices and license terms.
2022-11-20 19:40:51 +00:00
from string import Template
2023-04-21 13:40:07 +00:00
import requests
import logging
import html2text
import re
from datetime import datetime
from decimal import Decimal
2023-01-13 12:48:50 +00:00
from trytond.model import ModelView, ModelSQL, fields
from trytond.pool import Pool
from trytond.pyson import Eval, Bool
from trytond.i18n import gettext
2022-11-18 23:21:52 +00:00
logger = logging.getLogger(__name__)
sel_rgxdecimal = [
('.', '.'),
(',', ','),
]
sel_rgxidtype = [
('isin', 'ISIN'),
('nsin', 'NSIN'),
('symbol', 'Symbol'),
]
sel_rgxdatefmt = [
('%d.%m.%Y', 'dd.mm.yyyy'),
('%d.%m.%y', 'dd.mm.yy'),
('%m/%d/%Y', 'mm/dd/yyyy'),
('%m/%d/%y', 'mm/dd/yy'),
('%Y-%m-%d', 'yyyy-mm-dd'),
('%b %d %Y', 'mon dd yyyy'),
]
2023-04-21 13:40:07 +00:00
fields_check = [
'url', 'nsin', 'isin', 'symbol', 'text', 'http_state',
'fnddate', 'fndrate', 'fndident']
STATES_WEB = {
'invisible': Eval('query_method', '') != 'web',
'required': Eval('query_method', '') == 'web',
}
DEPENDS_WEB = ['query_method']
class OnlineSource(ModelSQL, ModelView):
'Online Source'
__name__ = 'investment.source'
2022-11-18 23:21:52 +00:00
name = fields.Char(string='Name', required=True)
2023-04-21 13:40:07 +00:00
query_method = fields.Selection(
string='Method', required=True,
help='Select the method to retrieve the data.',
selection='get_query_methods')
url = fields.Char(string='URL', states=STATES_WEB, depends=DEPENDS_WEB)
2023-04-21 13:40:07 +00:00
nohtml = fields.Boolean(
string='Remove HTML',
help='Removes HTML tags before the text is interpreted.',
states={
'invisible': STATES_WEB['invisible'],
}, depends=DEPENDS_WEB)
2023-04-21 13:40:07 +00:00
rgxdate = fields.Char(
string='Date',
help='Regex code to find the date in the downloaded HTML file.',
states=STATES_WEB, depends=DEPENDS_WEB)
2023-04-21 13:40:07 +00:00
rgxdatefmt = fields.Selection(
string='Date format', selection=sel_rgxdatefmt,
states=STATES_WEB, depends=DEPENDS_WEB)
2023-04-21 13:40:07 +00:00
rgxrate = fields.Char(
string='Rate',
help='Regex code to find the rate in the downloaded HTML file.',
states=STATES_WEB, depends=DEPENDS_WEB)
2023-04-21 13:40:07 +00:00
rgxdecimal = fields.Selection(
string='Decimal Separator',
help='Decimal separator for converting the market value into a number.',
selection=sel_rgxdecimal, states=STATES_WEB, depends=DEPENDS_WEB)
2023-04-21 13:40:07 +00:00
rgxident = fields.Char(
string='Identifier',
help='Regex code to find the identifier in the downloaded HTML file.',
states={
'invisible': STATES_WEB['invisible'],
}, depends=DEPENDS_WEB)
2023-04-21 13:40:07 +00:00
rgxidtype = fields.Selection(
string='ID-Type', selection=sel_rgxidtype,
help='Type of identifier used to validate the result.',
states={
'required': Bool(Eval('rgxident', '')),
'invisible': STATES_WEB['invisible'],
}, depends=DEPENDS_WEB+['rgxident'])
2022-11-20 19:40:51 +00:00
2022-11-18 23:21:52 +00:00
# field to test requests
2023-04-21 13:40:07 +00:00
used_url = fields.Function(fields.Char(
string='Used URL', readonly=True,
help='This URL is used to retrieve the HTML file.',
states={'invisible': STATES_WEB['invisible']}, depends=DEPENDS_WEB),
2022-11-20 19:40:51 +00:00
'on_change_with_used_url')
2023-04-21 13:40:07 +00:00
nsin = fields.Function(fields.Char(
string='NSIN'), 'on_change_with_nsin', setter='set_test_value')
isin = fields.Function(fields.Char(
string='ISIN'), 'on_change_with_isin', setter='set_test_value')
symbol = fields.Function(fields.Char(
string='Symbol'), 'on_change_with_symbol', setter='set_test_value')
http_state = fields.Function(fields.Char(
string='HTTP-State',
readonly=True), 'on_change_with_http_state')
2023-04-21 13:40:07 +00:00
text = fields.Function(fields.Text(
string='Result', readonly=True), 'on_change_with_text')
fnddate = fields.Function(fields.Date(
string='Date', readonly=True,
help='Date found during test query.'),
'on_change_with_fnddate')
2023-04-21 13:40:07 +00:00
fndrate = fields.Function(fields.Numeric(
string='Rate', readonly=True,
help='Rate found during test query.', digits=(16, 4)),
'on_change_with_fndrate')
2023-04-21 13:40:07 +00:00
fndident = fields.Function(fields.Char(
string='Identifier', readonly=True,
help='Identifier found during test query.'),
'on_change_with_fndident')
2022-11-18 23:21:52 +00:00
@classmethod
def __setup__(cls):
super(OnlineSource, cls).__setup__()
cls._order.insert(0, ('name', 'DESC'))
@classmethod
def default_query_method(cls):
""" default: web
"""
return 'web'
@classmethod
def default_url(cls):
""" defaul-url
"""
return 'https://'
2022-11-20 19:40:51 +00:00
@classmethod
def default_rgxdate(cls):
""" code to find date: dd.mm.yyyy
2022-11-18 23:21:52 +00:00
"""
2022-11-21 15:23:26 +00:00
return '(\\d{2}\\.\\d{2}\\.\\d{4})'
2022-11-18 23:21:52 +00:00
@classmethod
def default_rgxdatefmt(cls):
""" dd.mm.yyyy
"""
return '%d.%m.%Y'
@classmethod
def default_rgxrate(cls):
""" nn,nn
"""
return '(\\d+,\\d+)'
@classmethod
def default_rgxidtype(cls):
""" isin
"""
return 'isin'
@classmethod
def default_rgxdecimal(cls):
""" comma
"""
return ','
2022-11-18 23:21:52 +00:00
@classmethod
def default_nohtml(cls):
""" default: True
"""
return True
@fields.depends(*fields_check)
2022-11-18 23:21:52 +00:00
def on_change_nsin(self):
""" run request
"""
self.call_online_source()
@fields.depends(*fields_check)
2022-11-18 23:21:52 +00:00
def on_change_isin(self):
""" run request
"""
self.call_online_source()
@fields.depends(*fields_check)
2022-11-18 23:21:52 +00:00
def on_change_symbol(self):
""" run request
"""
self.call_online_source()
def on_change_with_fnddate(self, name=None):
return None
def on_change_with_fndrate(self, name=None):
return None
def on_change_with_fndident(self, name=None):
return ''
def on_change_with_http_state(self, name=True):
return ''
2022-11-18 23:21:52 +00:00
def on_change_with_text(self, name=None):
return ''
def on_change_with_nsin(self, name=None):
return ''
def on_change_with_isin(self, name=None):
return ''
def on_change_with_symbol(self, name=None):
return ''
2022-11-20 19:40:51 +00:00
@fields.depends('url', 'isin', 'nsin', 'symbol')
def on_change_with_used_url(self, name=None):
""" get url for testing
"""
if self.url:
return self.get_url_with_parameter(
2023-04-21 13:40:07 +00:00
isin=self.isin,
nsin=self.nsin,
symbol=self.symbol,
2022-11-20 19:40:51 +00:00
)
@classmethod
def get_query_methods(cls):
""" get list of query-methods
"""
return [
('web', gettext('investment.msg_querytype_web')),
]
2022-11-18 23:21:52 +00:00
@classmethod
def set_test_value(cls, record, name, value):
""" dont store it
"""
pass
@classmethod
def run_query_method(cls, osource, isin, nsin, symbol, debug=False):
""" run selected query to retrive data
result: {
'text': raw-text from query - for debug,
'http_state': state of query,
'date': date() if success,
'rate': Decimal() if success,
'code': identifier - isin/nsin/symbol
}
"""
OSourc = Pool().get('investment.source')
if getattr(osource, 'query_method', None) == 'web':
return OSourc.read_from_website(
osource,
2023-04-21 13:40:07 +00:00
isin=isin,
nsin=nsin,
symbol=symbol,
debug=debug,
)
2022-11-21 15:23:26 +00:00
def call_online_source(self):
""" use updated values to call online-source,
for testing parameters
"""
OSourc = Pool().get('investment.source')
2023-04-21 13:40:07 +00:00
result = OSourc.run_query_method(
self, self.isin, self.nsin,
self.symbol, debug=True)
if result is not None:
self.text = result.get('text', None)
self.http_state = result.get('http_state', None)
self.fnddate = result.get('date', None)
self.fndrate = result.get('rate', None)
self.fndident = result.get('code', None)
2022-11-21 15:23:26 +00:00
def get_url_with_parameter(self, isin=None, nsin=None, symbol=None):
""" generate url
"""
if self.url:
return Template(self.url).substitute({
'isin': isin if isin is not None else '',
'nsin': nsin if nsin is not None else '',
'symbol': symbol if symbol is not None else '',
})
2022-11-21 15:23:26 +00:00
2022-11-18 23:21:52 +00:00
@classmethod
def update_rate(cls, asset):
2022-11-21 15:23:26 +00:00
""" read data from inet, write result to rates of asset
2022-11-18 23:21:52 +00:00
"""
2022-11-22 21:43:28 +00:00
pool = Pool()
Rate = pool.get('investment.rate')
2022-11-29 20:54:27 +00:00
IrDate = pool.get('ir.date')
2022-11-22 21:43:28 +00:00
2022-11-29 20:54:27 +00:00
if len(asset.updtsources) == 0:
2022-11-18 23:21:52 +00:00
return
2022-11-22 21:43:28 +00:00
2022-11-29 20:54:27 +00:00
for updtsource in asset.updtsources:
rate_data = cls.run_query_method(
2022-11-29 20:54:27 +00:00
updtsource,
2023-04-21 13:40:07 +00:00
isin=asset.isin,
nsin=asset.wkn,
symbol=asset.secsymb,
)
2022-11-29 20:54:27 +00:00
if len(updtsource.rgxident or '') > 0:
# check result - same code?
code = rate_data.get('code', None)
if code:
asset_code = getattr(asset, {
'isin': 'isin',
'nsin': 'wkn',
'symbol': 'secsymb',
}[updtsource.rgxidtype])
if (asset_code or '').lower() != code.lower():
# fail
logger.warning(
2023-04-21 13:40:07 +00:00
'update_rate: got wrong code ' +
'"%(wrong)s" - expected "%(exp)s"' % {
2022-11-29 20:54:27 +00:00
'exp': asset_code,
'wrong': code,
})
continue
to_create = {
'date': rate_data.get('date', None),
'rate': rate_data.get('rate', None),
'asset': asset.id,
}
if (to_create['date'] is not None) and \
2023-04-21 13:40:07 +00:00
(to_create['rate'] is not None):
2022-11-29 20:54:27 +00:00
# check if exists
if Rate.search_count([
('asset.id', '=', asset.id),
2023-04-21 13:40:07 +00:00
('date', '=', to_create['date'])]) == 0:
2022-11-29 20:54:27 +00:00
Rate.create([to_create])
return True
2023-04-21 13:40:07 +00:00
else:
2022-11-29 20:54:27 +00:00
# if we got a record for today - stop
# otherwise try next source
if to_create['date'] == IrDate.today():
break
2022-11-22 21:43:28 +00:00
return False
2022-11-18 23:21:52 +00:00
def get_regex_result(self, html_text, field_name):
""" run regex on html-text, convert result
2022-11-18 23:21:52 +00:00
"""
rgxcode = getattr(self, field_name) or ''
2022-11-20 19:40:51 +00:00
if len(rgxcode) == 0:
return None
search_result = re.compile(rgxcode).search(html_text)
if search_result is None:
2022-11-20 19:40:51 +00:00
return None
2023-04-21 13:40:07 +00:00
try:
result = search_result.group(1)
except IndexError:
result = search_result.group(0)
if field_name == 'rgxrate':
dec_sep = [',', '.']
dec_sep.remove(self.rgxdecimal)
2023-04-21 13:40:07 +00:00
result = result.replace(
dec_sep[0], '').replace(self.rgxdecimal, '.')
try:
result = Decimal(result)
2023-04-21 13:40:07 +00:00
except Exception:
result = None
elif field_name == 'rgxdate':
2023-04-21 13:40:07 +00:00
try:
result = datetime.strptime(result, self.rgxdatefmt).date()
2023-04-21 13:40:07 +00:00
except Exception:
result = None
2022-11-20 19:40:51 +00:00
return result
2022-11-18 23:21:52 +00:00
@classmethod
2023-04-21 13:40:07 +00:00
def read_from_website(
cls, updtsource, isin=None, nsin=None,
symbol=None, debug=False):
2022-11-18 23:21:52 +00:00
""" read from url, extract values
"""
result = {}
if updtsource.url == 'https://':
result['text'] = 'invalid url'
return result
2022-11-20 19:40:51 +00:00
res1 = requests.get(
2022-11-21 08:32:57 +00:00
updtsource.get_url_with_parameter(
2023-04-21 13:40:07 +00:00
isin=isin,
nsin=nsin,
symbol=symbol,
2022-11-21 08:32:57 +00:00
),
2022-11-20 19:40:51 +00:00
allow_redirects=True,
timeout=5.0)
result['http_state'] = '%(code)d: %(msg)s' % {
'code': res1.status_code,
'msg': res1.reason,
}
2022-11-21 15:23:26 +00:00
if res1.status_code in [200, 204]:
html = res1.text
2022-11-18 23:21:52 +00:00
2022-11-21 15:23:26 +00:00
# remove html-tags
2022-11-18 23:21:52 +00:00
if updtsource.nohtml:
o1 = html2text.HTML2Text()
o1.ignore_links = True
o1.ignore_tables = True
o1.bypass_tables = False
o1.single_line_break = True
o1.body_width = 0
2022-11-18 23:21:52 +00:00
html = o1.handle(html)
del o1
if debug:
result['text'] = html
2022-11-20 19:40:51 +00:00
result['rate'] = updtsource.get_regex_result(html, 'rgxrate')
result['date'] = updtsource.get_regex_result(html, 'rgxdate')
result['code'] = updtsource.get_regex_result(html, 'rgxident')
2023-04-21 13:40:07 +00:00
else:
logger.error(
'read_from_website: ' +
'%(code)s, url: %(url)s, redirects: [%(redirects)s]' % {
2022-11-21 15:23:26 +00:00
'code': res1.status_code,
'url': res1.url,
'redirects': ', '.join([x.url for x in res1.history]),
})
2022-11-18 23:21:52 +00:00
if debug:
result['text'] = res1.text
return result
# end OnlineSource