135 lines
3.8 KiB
Python
135 lines
3.8 KiB
Python
# -*- coding: utf-8 -*-
|
|
# This file is part of the investment-module from m-ds for Tryton.
|
|
# The COPYRIGHT file at the top level of this repository contains the
|
|
# full copyright notices and license terms.
|
|
|
|
import requests, logging, html2text
|
|
from trytond.model import ModelView, ModelSQL, fields, Unique, Check
|
|
from trytond.transaction import Transaction
|
|
from trytond.pool import Pool
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class OnlineSource(ModelSQL, ModelView):
|
|
'Online Source'
|
|
__name__ = 'investment.source'
|
|
|
|
name = fields.Char(string='Name', required=True)
|
|
url = fields.Char(string='URL', required=True)
|
|
nohtml = fields.Boolean(string='Remove HTML',
|
|
help='Removes HTML tags before the text is interpreted.')
|
|
|
|
# field to test requests
|
|
nsin = fields.Function(fields.Char(string='NSIN'),
|
|
'on_change_with_nsin', setter='set_test_value')
|
|
isin = fields.Function(fields.Char(string='ISIN'),
|
|
'on_change_with_isin', setter='set_test_value')
|
|
symbol = fields.Function(fields.Char(string='Symbol'),
|
|
'on_change_with_symbol', setter='set_test_value')
|
|
text = fields.Function(fields.Text(string='Result',
|
|
readonly=True), 'on_change_with_text')
|
|
|
|
def call_online_source(self):
|
|
""" use updated values to call online-source
|
|
"""
|
|
OSourc = Pool().get('investment.source')
|
|
|
|
result = OSourc.read_from_website(self, debug=True)
|
|
self.text = result.get('text', None)
|
|
|
|
@classmethod
|
|
def default_nohtml(cls):
|
|
""" default: True
|
|
"""
|
|
return True
|
|
|
|
@fields.depends('nsin', 'isin', 'symbol', 'text')
|
|
def on_change_nsin(self):
|
|
""" run request
|
|
"""
|
|
self.call_online_source()
|
|
|
|
@fields.depends('nsin', 'isin', 'symbol', 'text')
|
|
def on_change_isin(self):
|
|
""" run request
|
|
"""
|
|
self.call_online_source()
|
|
|
|
@fields.depends('nsin', 'isin', 'symbol', 'text')
|
|
def on_change_symbol(self):
|
|
""" run request
|
|
"""
|
|
self.call_online_source()
|
|
|
|
def on_change_with_text(self, name=None):
|
|
""" return existing value
|
|
"""
|
|
return ''
|
|
|
|
def on_change_with_nsin(self, name=None):
|
|
""" return existing value
|
|
"""
|
|
return ''
|
|
|
|
def on_change_with_isin(self, name=None):
|
|
""" return existing value
|
|
"""
|
|
return ''
|
|
|
|
def on_change_with_symbol(self, name=None):
|
|
""" return existing value
|
|
"""
|
|
return ''
|
|
|
|
@classmethod
|
|
def set_test_value(cls, record, name, value):
|
|
""" dont store it
|
|
"""
|
|
pass
|
|
|
|
@classmethod
|
|
def update_rate(cls, asset):
|
|
""" read data from inet
|
|
"""
|
|
if asset.updtsource is None:
|
|
return
|
|
|
|
rate_data = cls.read_from_website(asset.updtsource)
|
|
|
|
@classmethod
|
|
def cleanup_spaces(cls, text):
|
|
""" remove multiple spaces
|
|
"""
|
|
len1 = -1
|
|
while len1 != len(text):
|
|
len1 = len(text)
|
|
text = text.replace('\t', ' ').replace(' ', ' ')
|
|
text = text.replace('\n\r', '\n').replace('\n\n', '\n')
|
|
return text
|
|
|
|
@classmethod
|
|
def read_from_website(cls, updtsource, debug=False):
|
|
""" read from url, extract values
|
|
"""
|
|
result = {}
|
|
|
|
res1 = requests.get(updtsource.url)
|
|
if res1.reason == 'OK':
|
|
html = cls.cleanup_spaces(res1.text)
|
|
|
|
# remove html
|
|
if updtsource.nohtml:
|
|
o1 = html2text.HTML2Text()
|
|
o1.ignore_links = True
|
|
html = o1.handle(html)
|
|
del o1
|
|
|
|
if debug:
|
|
result['text'] = html
|
|
else :
|
|
logger.error('read_from_website: %s' % res1.text)
|
|
if debug:
|
|
result['text'] = res1.text
|
|
return result
|
|
|
|
# end OnlineSource
|