cashbook_dataexchange/qiftool.py

463 lines
17 KiB
Python
Raw Normal View History

# -*- coding: utf-8 -*-
# This file is part of the cashbook-module from m-ds for Tryton.
# The COPYRIGHT file at the top level of this repository contains the
# full copyright notices and license terms.
from trytond.pool import Pool
from trytond.model import Model
2022-09-01 12:48:04 +00:00
from trytond.i18n import gettext
2022-09-05 08:13:20 +00:00
from trytond.report import Report
2022-09-01 12:48:04 +00:00
from decimal import Decimal
from datetime import datetime
class QifTool(Model):
'QIF Tool'
__name__ = 'cashbook_dataexchange.qiftool'
@classmethod
def split_by_type(cls, qifdata):
""" split file-content by type
"""
lines = qifdata.split('\n')
blocks = {}
current_type = None
for line in lines:
if line.startswith('!Type:'):
current_type = line[len('!Type:'):].strip()
else :
if current_type is None:
continue
if not current_type in blocks.keys():
blocks[current_type] = []
blocks[current_type].append(line.strip())
for block in blocks.keys():
blocks[block] = '\n'.join(blocks[block])
return blocks
2022-09-01 12:48:04 +00:00
@classmethod
def get_amount_from_txt(cls, amount_txt):
""" convert text to Decimal
"""
if (',' in amount_txt) and (amount_txt[-3] == '.'):
# '.' = decimal, ',' = tousand
amount_txt = amount_txt.replace(',', '.')
elif ('.' in amount_txt) and (amount_txt[-3] == ','):
# ',' = decimal, '.' = tousand
amount_txt = amount_txt.replace('.', '')
amount_txt = amount_txt.replace(',', '.')
elif ',' in amount_txt:
amount_txt = amount_txt.replace(',', '.')
return Decimal(amount_txt)
@classmethod
def qif_read_transactions(cls, transactiondata):
""" read transactions from text
result: [{
'split': [{
'amount': <Decimal>,
'description': 'purpose',
'category': 'name of categroy',
},...],
'date': <date of transaction>,
'amount': <Decimal>,
'party': 'name of party',
'address': 'address of party',
'checknumber': 'number',
'description': 'purpose',
'state': 'check|edit',
'account': 'name of cashbook',
'category': 'name of category',
}, ...]
"""
result = []
for booktxt in transactiondata.split('^'):
if len(booktxt.strip()) == 0:
continue
booking = {'split': []}
for line in booktxt.strip().split('\n'):
line_txt = line[1:].strip()
if line.startswith('D'): # date
booking['date'] = datetime.strptime(line_txt, '%d.%m.%Y').date()
elif line.startswith('T'): # total
booking['amount'] = cls.get_amount_from_txt(line_txt)
elif line.startswith('U'): # total
booking['amount'] = cls.get_amount_from_txt(line_txt)
elif line.startswith('P'): # party
booking['party'] = line_txt
elif line.startswith('A'): # address
booking['address'] = line_txt
elif line.startswith('N'): # address
booking['checknumber'] = line_txt
elif line.startswith('M'): # memo
booking['description'] = line_txt
elif line.startswith('C'): # state
booking['state'] = {
'X': 'check',
'*': 'edit',
}.get(line_txt, 'edit')
elif line.startswith('L'): # category, account
if line_txt.startswith('[') and line_txt.endswith(']'):
booking['account'] = line_txt[1:-1]
else :
booking['category'] = line_txt
elif line.startswith('S'): # split: category
booking['split'].append({
'category': line_txt,
})
elif line.startswith('E'): # split: memo
booking['split'][-1]['description'] = line_txt
elif line.startswith('$'): # split: amount
booking['split'][-1]['amount'] = cls.get_amount_from_txt(line_txt)
elif line.startswith('£'): # split: amount
booking['split'][-1]['amount'] = cls.get_amount_from_txt(line_txt)
else :
raise ValueError('unknown line-code: %s' % (line))
result.append(booking)
return result
2022-09-05 08:13:20 +00:00
@classmethod
def qif_export_book(cls, book):
""" export book
"""
result = ['!Type:Bank']
def get_amount_by_bookingstate(amount, line):
""" get amount with sign
"""
if line.bookingtype in ['in', 'spin', 'mvin']:
return amount
elif line.bookingtype in ['out', 'spout', 'mvout']:
return amount * Decimal('-1.0')
else :
raise ValueError('invalid bookingtype: %s' % line.bookingtype)
for line in book.lines:
# date
result.append('D%(date)s' % {
'date': Report.format_date(line.date, None),
})
# total
result.append('T%(total)s' % {
'total': Report.format_number(get_amount_by_bookingstate(line.amount, line), None),
})
# state
result.append('C%(state)s' % {
'state': 'X' if line.state in ['check', 'done'] else '*',
})
# party
if line.party:
result.append('P%(party)s' % {
'party': line.party.rec_name,
})
# address
p_address = line.party.address_get()
if p_address:
if len(p_address.full_address.strip()) > 0:
result.append('A%(address)s' % {
'address': p_address.full_address.replace('\n', ', ').strip(),
})
# category
if line.category:
result.append('L%(category)s' % {
'category': line.category.rec_name.replace('/', ':'),
})
# account
if line.booktransf:
result.append('L[%(account)s]' % {
'account': line.booktransf.name,
})
# description
if line.description:
result.append('M%(memo)s' % {
'memo': line.description.replace('\n', '; ')
})
# split-booking
for splitline in line.splitlines:
result.append('S%(category)s' % {
'category': splitline.category.rec_name.replace('/', ':'),
})
if splitline.description:
result.append('E%(memo)s' % {
'memo': splitline.description.replace('\n', '; ')
})
result.append('$%(total)s' % {
'total': Report.format_number(get_amount_by_bookingstate(splitline.amount, line), None),
})
result.append('^')
return '\n'.join(result)
@classmethod
def get_party_by_name(cls, partyname):
""" find party
"""
Party = Pool().get('party.party')
party_id = None
msg_txt = None
parties = Party.search([('rec_name', 'ilike', '%%%s%%' % partyname)])
if len(parties) == 0:
msg_txt = gettext(
'cashbook_dataexchange.mds_import_party_notfound',
pname = partyname,
)
elif len(parties) == 1:
party_id = parties[0].id
else :
party_id = parties[0].id
msg_txt = gettext(
'cashbook_dataexchange.mds_import_many_parties_found',
pname = partyname,
pname2 = parties[0].rec_name,
)
return (party_id, msg_txt)
2022-09-01 12:48:04 +00:00
@classmethod
def get_category_by_name(cls, catname, cattype):
""" find category
"""
Category = Pool().get('cashbook.category')
cat_id = None
msg_txt = None
categories = Category.search([
('cattype', '=', cattype),
('rec_name', '=', catname.replace(':', '/')),
])
if len(categories) == 1:
cat_id = categories[0].id
elif len(categories) == 0:
msg_txt = gettext(
'cashbook_dataexchange.mds_import_category_notfound',
catname = catname,
cattype = cattype,
)
else :
msg_txt = gettext(
'cashbook_dataexchange.mds_import_many_categories_found',
catname1 = catname,
catname2 = categories[0].rec_name,
cattype = cattype,
)
cat_id = categories[0].id
return (cat_id, msg_txt)
@classmethod
def convert_categories_to_create(cls, cat_tree):
""" cat_tree: result from cls.qif_read_categories()
"""
Category = Pool().get('cashbook.category')
def get_create(ctype, catdict, parent, do_search):
""" check if category exists, generate create-data
"""
result = []
for catname in catdict.keys():
if do_search == True:
c_lst = Category.search([
('cattype', '=', ctype),
('name', '=', catname),
('parent', '=', None) if parent is None else ('parent.id', '=', parent.id),
])
else :
c_lst = []
if len(c_lst) == 0:
cat1 = {
'cattype': ctype,
'name': catname,
}
if parent is not None:
cat1['parent'] = parent.id
if len(catdict[catname]['childs']) > 0:
childs = get_create(ctype, catdict[catname]['childs'], None, False)
if len(childs) > 0:
cat1['childs'] = [('create', childs)]
result.append(cat1)
else :
if len(catdict[catname]['childs']) > 0:
result.extend(get_create(ctype, catdict[catname]['childs'], c_lst[0], True))
return result
to_create = []
for typ1 in ['in', 'out']:
to_create.extend(get_create(typ1, cat_tree[typ1], None, True))
return to_create
2022-09-01 15:13:55 +00:00
@classmethod
def convert_parties_to_create(cls, transactions):
""" extract party from transaction, check if exist,
create 'to_create'
"""
Party = Pool().get('party.party')
to_create = []
party_cache = []
for transaction in transactions:
if 'party' in transaction.keys():
if transaction['party'] in party_cache:
continue
party_cache.append(transaction['party'])
if Party.search_count([
('rec_name', 'ilike', '%%%(pname)s%%' % {
'pname': transaction['party'],
})
]) == 0:
to_create.append({
'name': transaction['party'],
'addresses': [('create', [{
'street': transaction.get('address', None),
}])],
})
return to_create
2022-09-01 12:48:04 +00:00
@classmethod
def convert_transactions_to_create(cls, transactions, split2edit=True):
""" convert read transactions to create-command
split2edit: True = split-bokings are 'edit', False = dont change
"""
to_create = []
msg_list = []
fail_cnt = 0
2022-09-01 12:48:04 +00:00
for transaction in transactions:
line = {x:transaction[x] for x in [
'date', 'amount', 'description', 'state',
] if x in transaction.keys()}
if len(transaction['split']) > 0:
if line['amount'] >= Decimal('0.0'):
line['bookingtype'] = 'spin'
else :
line['bookingtype'] = 'spout'
line['amount'] = line['amount'].copy_sign(Decimal('1.0'))
else :
if line['amount'] >= Decimal('0.0'):
line['bookingtype'] = 'in'
else :
line['bookingtype'] = 'out'
line['amount'] = line['amount'].copy_sign(Decimal('1.0'))
# party
if 'party' in transaction.keys():
(party_id, msg_txt) = cls.get_party_by_name(transaction['party'])
if party_id is not None:
line['party'] = party_id
else :
fail_cnt += 1
if msg_txt is not None:
msg_list.append(msg_txt)
2022-09-01 12:48:04 +00:00
# store 'account' like 'category'
cat_name = transaction.get('category', transaction.get('account', None))
cat_type = 'in' if line['bookingtype'] in ['in', 'spin'] else 'out'
if cat_name is not None:
(cat_id, msg_txt) = cls.get_category_by_name(cat_name, cat_type)
if cat_id is None:
msg_list.append(msg_txt)
2022-09-03 18:36:16 +00:00
fail_cnt += 1
2022-09-01 12:48:04 +00:00
continue
else :
line['category'] = cat_id
if msg_txt is not None:
msg_list.append(msg_txt)
for x in ['address', 'checknumber']:
if x in transaction.keys():
line['description'] = ', '.join([
line.get('description', ''),
'%s %s' % (
gettext('cashbook_dataexchange.mds_import_%s' % x),
transaction[x]
),
])
split_lines = []
for sp_line in transaction['split']:
(cat_id, msg_txt) = cls.get_category_by_name(sp_line['category'], cat_type)
if msg_txt is not None:
msg_list.append(msg_txt)
if cat_id is not None:
split_lines.append({
'amount': sp_line['amount'].copy_sign(Decimal('1.0')),
'description': sp_line.get('description', None),
'category': cat_id,
})
2022-09-03 18:36:16 +00:00
else :
fail_cnt += 1
2022-09-01 12:48:04 +00:00
if len(split_lines) > 0:
line['splitlines'] = [('create', split_lines)]
if split2edit == True:
if 'splitlines' in line.keys():
line['state'] = 'edit'
to_create.append(line)
return (to_create, msg_list, fail_cnt)
2022-09-01 12:48:04 +00:00
@classmethod
def qif_read_categories(cls, catdata):
""" read categories from text
result: {
'in': [{
'<Category-Name>': {
'type': 'in|out',
'childs': [...],
},
},...],
'out': [{},...],
}
"""
def add_category(catdict, namelst, ctype):
""" add category to dict
"""
if not namelst[0] in catdict.keys():
catdict[namelst[0]] = {'type': ctype, 'childs': {}}
if len(namelst) > 1:
catdict[namelst[0]]['childs'] = add_category(
catdict[namelst[0]]['childs'],
namelst[1:],
ctype)
return catdict
categories = {'in': {}, 'out': {}}
for cattxt in catdata.split('^'):
if len(cattxt.strip()) == 0:
continue
catname = None
cattype = None
for line in cattxt.strip().split('\n'):
if line.startswith('N'):
catname = line[1:].strip().split(':')
elif line.startswith('E'):
cattype = 'out'
elif line.startswith('I'):
cattype = 'in'
else :
raise ValueError('invalid line: %s (%s)' % (line, cattxt))
categories[cattype] = add_category(categories[cattype], catname, cattype)
return categories
2022-08-31 15:32:01 +00:00
@classmethod
def qif_export_category(cls, record):
""" export single category as qif
"""
return '\n'.join([
'N%(cname)s' % {
'cname': record.rec_name.replace('/', ':'),
},
'E' if record.cattype == 'out' else 'I',
'^',
])
# end QifTool