cashbook_dataexchange/qiftool.py

632 lines
24 KiB
Python
Raw Normal View History

# -*- coding: utf-8 -*-
# This file is part of the cashbook-module from m-ds for Tryton.
# The COPYRIGHT file at the top level of this repository contains the
# full copyright notices and license terms.
from trytond.pool import Pool
from trytond.model import Model
2022-09-01 12:48:04 +00:00
from trytond.i18n import gettext
2022-09-05 08:13:20 +00:00
from trytond.report import Report
2022-09-01 12:48:04 +00:00
from decimal import Decimal
from datetime import datetime
class QifTool(Model):
'QIF Tool'
__name__ = 'cashbook_dataexchange.qiftool'
@classmethod
def split_by_type(cls, qifdata):
""" split file-content by type
"""
lines = qifdata.split('\n')
blocks = {}
current_type = None
for line in lines:
if line.startswith('!Type:'):
current_type = line[len('!Type:'):].strip()
2023-06-06 13:22:57 +00:00
else:
if current_type is None:
continue
2023-06-06 13:22:57 +00:00
if current_type not in blocks.keys():
blocks[current_type] = []
blocks[current_type].append(line.strip())
for block in blocks.keys():
blocks[block] = '\n'.join(blocks[block])
return blocks
2022-09-01 12:48:04 +00:00
@classmethod
def get_amount_from_txt(cls, amount_txt):
""" convert text to Decimal
"""
if (',' in amount_txt) and (amount_txt[-3] == '.'):
# '.' = decimal, ',' = tousand
amount_txt = amount_txt.replace(',', '.')
elif ('.' in amount_txt) and (amount_txt[-3] == ','):
# ',' = decimal, '.' = tousand
amount_txt = amount_txt.replace('.', '')
amount_txt = amount_txt.replace(',', '.')
elif ',' in amount_txt:
amount_txt = amount_txt.replace(',', '.')
return Decimal(amount_txt)
@classmethod
def qif_read_transactions(cls, transactiondata):
""" read transactions from text
result: [{
'split': [{
'amount': <Decimal>,
'description': 'purpose',
'category': 'name of categroy',
},...],
'date': <date of transaction>,
'amount': <Decimal>,
'party': 'name of party',
'address': 'address of party',
'checknumber': 'number',
'description': 'purpose',
'state': 'check|edit',
'account': 'name of cashbook',
'category': 'name of category',
}, ...]
"""
result = []
for booktxt in transactiondata.split('^'):
if len(booktxt.strip()) == 0:
continue
booking = {'split': []}
for line in booktxt.strip().split('\n'):
line_txt = line[1:].strip()
if line.startswith('D'): # date
2023-06-06 13:22:57 +00:00
booking['date'] = datetime.strptime(
line_txt, '%d.%m.%Y').date()
2022-09-01 12:48:04 +00:00
elif line.startswith('T'): # total
booking['amount'] = cls.get_amount_from_txt(line_txt)
elif line.startswith('U'): # total
booking['amount'] = cls.get_amount_from_txt(line_txt)
elif line.startswith('P'): # party
booking['party'] = line_txt
elif line.startswith('A'): # address
booking['address'] = line_txt
elif line.startswith('N'): # address
booking['checknumber'] = line_txt
elif line.startswith('M'): # memo
booking['description'] = line_txt
elif line.startswith('C'): # state
booking['state'] = {
'X': 'check',
'*': 'edit',
}.get(line_txt, 'edit')
elif line.startswith('L'): # category, account
if line_txt.startswith('[') and line_txt.endswith(']'):
booking['account'] = line_txt[1:-1]
2023-06-06 13:22:57 +00:00
else:
2022-09-01 12:48:04 +00:00
booking['category'] = line_txt
elif line.startswith('S'): # split: category, account
if line_txt.startswith('[') and line_txt.endswith(']'):
booking['split'].append({
'account': line_txt[1:-1],
})
2023-06-06 13:22:57 +00:00
else:
booking['split'].append({
'category': line_txt,
})
2022-09-01 12:48:04 +00:00
elif line.startswith('E'): # split: memo
booking['split'][-1]['description'] = line_txt
elif line.startswith('$'): # split: amount
2023-06-06 13:22:57 +00:00
booking['split'][-1]['amount'] = \
cls.get_amount_from_txt(line_txt)
2022-09-01 12:48:04 +00:00
elif line.startswith('£'): # split: amount
2023-06-06 13:22:57 +00:00
booking['split'][-1]['amount'] = \
cls.get_amount_from_txt(line_txt)
else:
2022-09-01 12:48:04 +00:00
raise ValueError('unknown line-code: %s' % (line))
result.append(booking)
return result
2022-09-05 08:13:20 +00:00
@classmethod
def qif_export_book(cls, book):
""" export book
"""
result = ['!Type:Bank']
def get_amount_by_bookingstate(amount, line):
""" get amount with sign
"""
if line.bookingtype in ['in', 'spin', 'mvin']:
return amount
elif line.bookingtype in ['out', 'spout', 'mvout']:
return amount * Decimal('-1.0')
2023-06-06 13:22:57 +00:00
else:
2022-09-05 08:13:20 +00:00
raise ValueError('invalid bookingtype: %s' % line.bookingtype)
for line in book.lines:
# date
result.append('D%(date)s' % {
'date': Report.format_date(line.date, None),
})
# total
result.append('T%(total)s' % {
2023-06-06 13:22:57 +00:00
'total': Report.format_number(
get_amount_by_bookingstate(line.amount, line), None),
2022-09-05 08:13:20 +00:00
})
# state
result.append('C%(state)s' % {
'state': 'X' if line.state in ['check', 'done'] else '*',
})
# party
if line.party:
result.append('P%(party)s' % {
'party': line.party.rec_name,
})
# address
p_address = line.party.address_get()
if p_address:
if len(p_address.full_address.strip()) > 0:
result.append('A%(address)s' % {
2023-06-06 13:22:57 +00:00
'address': p_address.full_address.replace(
'\n', ', ').strip()})
2022-09-05 08:13:20 +00:00
# category
if line.category:
result.append('L%(category)s' % {
'category': line.category.rec_name.replace('/', ':'),
})
# account
if line.booktransf:
result.append('L[%(account)s]' % {
'account': line.booktransf.name,
})
# description
if line.description:
result.append('M%(memo)s' % {
'memo': line.description.replace('\n', '; ')
})
# split-booking
for splitline in line.splitlines:
result.append('S%(category)s' % {
'category': splitline.category.rec_name.replace('/', ':'),
})
if splitline.description:
result.append('E%(memo)s' % {
'memo': splitline.description.replace('\n', '; ')
})
result.append('$%(total)s' % {
2023-06-06 13:22:57 +00:00
'total': Report.format_number(
get_amount_by_bookingstate(
splitline.amount, line), None),
2022-09-05 08:13:20 +00:00
})
result.append('^')
return '\n'.join(result)
@classmethod
def get_party_by_name(cls, partyname):
""" find party
"""
Party = Pool().get('party.party')
party_id = None
msg_txt = None
parties = Party.search([('rec_name', 'ilike', '%%%s%%' % partyname)])
if len(parties) == 0:
msg_txt = gettext(
'cashbook_dataexchange.mds_import_party_notfound',
2023-06-06 13:22:57 +00:00
pname=partyname)
elif len(parties) == 1:
party_id = parties[0].id
2023-06-06 13:22:57 +00:00
else:
party_id = parties[0].id
msg_txt = gettext(
'cashbook_dataexchange.mds_import_many_parties_found',
2023-06-06 13:22:57 +00:00
pname=partyname,
pname2=parties[0].rec_name)
return (party_id, msg_txt)
2022-09-01 12:48:04 +00:00
@classmethod
def get_account_by_name(cls, book, account_name):
""" find cashbook
"""
Book = Pool().get('cashbook.book')
book_obj = None
msg_txt = None
books = Book.search([
('name', '=', account_name),
('owner.id', '=', book.owner.id),
('id', '!=', book.id),
])
if len(books) == 1:
book_obj = books[0]
elif len(books) == 0:
msg_txt = gettext(
'cashbook_dataexchange.mds_import_book_notfound',
2023-06-06 13:22:57 +00:00
bookname=account_name,
)
2023-06-06 13:22:57 +00:00
else:
msg_txt = gettext(
'cashbook_dataexchange.mds_import_many_books_found',
2023-06-06 13:22:57 +00:00
bookname1=account_name,
bookname2=books[0].rec_name,
)
book_obj = books[0]
return (book_obj, msg_txt)
@classmethod
def get_category_by_name(cls, company, catname):
2022-09-01 12:48:04 +00:00
""" find category
"""
Category = Pool().get('cashbook.category')
cat_obj = None
2022-09-01 12:48:04 +00:00
msg_txt = None
categories = Category.search([
('rec_name', '=', catname.replace(':', '/')),
('company.id', '=', company.id),
2022-09-01 12:48:04 +00:00
])
if len(categories) == 1:
cat_obj = categories[0]
2022-09-01 12:48:04 +00:00
elif len(categories) == 0:
msg_txt = gettext(
'cashbook_dataexchange.mds_import_category_notfound',
2023-06-06 13:22:57 +00:00
catname=catname,
2022-09-01 12:48:04 +00:00
)
2023-06-06 13:22:57 +00:00
else:
2022-09-01 12:48:04 +00:00
msg_txt = gettext(
'cashbook_dataexchange.mds_import_many_categories_found',
2023-06-06 13:22:57 +00:00
catname1=catname,
catname2='%(name)s [%(type)s]' % {
'name': categories[0].rec_name,
'type': categories[0].cattype,
},
2022-09-01 12:48:04 +00:00
)
cat_obj = categories[0]
return (cat_obj, msg_txt)
2022-09-01 12:48:04 +00:00
@classmethod
def convert_categories_to_create(cls, cat_tree):
""" cat_tree: result from cls.qif_read_categories()
"""
Category = Pool().get('cashbook.category')
def get_create(ctype, catdict, parent, do_search):
""" check if category exists, generate create-data
"""
result = []
for catname in catdict.keys():
2023-06-06 13:22:57 +00:00
if do_search is True:
c_lst = Category.search([
2023-06-06 13:22:57 +00:00
('cattype', '=', ctype),
('name', '=', catname),
('parent', '=', None)
if parent is None else ('parent.id', '=', parent.id)])
else:
c_lst = []
if len(c_lst) == 0:
cat1 = {
'cattype': ctype,
'name': catname,
}
if parent is not None:
cat1['parent'] = parent.id
if len(catdict[catname]['childs']) > 0:
2023-06-06 13:22:57 +00:00
childs = get_create(
ctype, catdict[catname]['childs'], None, False)
if len(childs) > 0:
cat1['childs'] = [('create', childs)]
result.append(cat1)
2023-06-06 13:22:57 +00:00
else:
if len(catdict[catname]['childs']) > 0:
2023-06-06 13:22:57 +00:00
result.extend(get_create(
ctype, catdict[catname]['childs'], c_lst[0], True))
return result
to_create = []
for typ1 in ['in', 'out']:
to_create.extend(get_create(typ1, cat_tree[typ1], None, True))
return to_create
2022-09-01 15:13:55 +00:00
@classmethod
def convert_parties_to_create(cls, transactions):
""" extract party from transaction, check if exist,
create 'to_create'
"""
Party = Pool().get('party.party')
to_create = []
party_cache = []
for transaction in transactions:
if 'party' in transaction.keys():
if transaction['party'] in party_cache:
continue
party_cache.append(transaction['party'])
if Party.search_count([
('rec_name', 'ilike', '%%%(pname)s%%' % {
'pname': transaction['party'],
2023-06-06 13:22:57 +00:00
})]) == 0:
2022-09-01 15:13:55 +00:00
to_create.append({
'name': transaction['party'],
'addresses': [('create', [{
'street': transaction.get('address', None),
}])],
})
return to_create
2022-09-01 12:48:04 +00:00
@classmethod
2022-09-13 09:29:43 +00:00
def check_counter_transaction(cls, book, line):
""" check if planned transaction was already inserted by
import to target-cashbook
"""
Line = Pool().get('cashbook.line')
if Line.search_count([
2023-06-06 13:22:57 +00:00
('cashbook.id', '=', book.id),
('booktransf.id', '=', line['booktransf']),
('date', '=', line['date']),
# ('description', '=', line['description']),
('amount', '=', line['amount']),
('bookingtype', '=', line['bookingtype']),
]) > 0:
return True
2023-06-06 13:22:57 +00:00
else:
return False
@classmethod
def get_category_account(cls, book, transaction):
""" get category or account
"""
cat_name = transaction.get('category', None)
account_name = transaction.get('account', None)
msg_list = []
fail_cnt = 0
category = account = None
if cat_name is not None:
2023-06-06 13:22:57 +00:00
(category, msg_txt) = cls.get_category_by_name(
book.company, cat_name)
if category is None:
msg_list.append(msg_txt)
fail_cnt += 1
elif account_name is not None:
(account, msg_txt) = cls.get_account_by_name(book, account_name)
if account is None:
msg_list.append(msg_txt)
fail_cnt += 1
return (category, account, msg_list, fail_cnt)
@classmethod
2023-06-06 13:22:57 +00:00
def convert_transactions_to_create(
cls, book, transactions, split2edit=True):
2022-09-01 12:48:04 +00:00
""" convert read transactions to create-command
split2edit: True = split-bookings are 'edit', False = dont change
2022-09-01 12:48:04 +00:00
"""
def updt_description(descr_txt):
""" repair line breaks
"""
if descr_txt is None:
return None
return descr_txt.replace('\\n', '\n')
2022-09-01 12:48:04 +00:00
to_create = []
msg_list = []
fail_cnt = 0
2022-09-01 12:48:04 +00:00
for transaction in transactions:
2023-06-06 13:22:57 +00:00
line = {
x: transaction[x]
for x in ['date', 'amount', 'description', 'state']
if x in transaction.keys()}
2022-09-01 12:48:04 +00:00
if 'description' in line.keys():
line['description'] = updt_description(line['description'])
2023-06-06 13:22:57 +00:00
(category, account, msg_lst2, fail_cnt2) = \
cls.get_category_account(book, transaction)
msg_list.extend(msg_lst2)
if fail_cnt2 > 0:
fail_cnt += fail_cnt2
continue
if category:
cat_type = category.cattype
line['category'] = category.id
if cat_type == 'out':
# amount of usually out-transaction are negative in QIF,
# if its a return-transaction it should be positive
line['amount'] = line['amount'].copy_negate()
line['bookingtype'] = cat_type
if len(transaction['split']) > 0:
line['bookingtype'] = {
'in': 'spin',
'out': 'spout',
}[cat_type]
elif account:
if line['amount'] < Decimal('0.0'):
line['bookingtype'] = 'mvout'
line['amount'] = line['amount'].copy_negate()
2023-06-06 13:22:57 +00:00
else:
line['bookingtype'] = 'mvin'
line['booktransf'] = account.id
descr_lst = [transaction.get('party', '-')]
if 'description' in line.keys():
descr_lst.append(line['description'])
if 'party' in transaction.keys():
del transaction['party']
line['description'] = '; '.join(descr_lst)
line['state'] = 'edit'
2023-06-06 13:22:57 +00:00
if cls.check_counter_transaction(book, line) is True:
# counter-transaction already exists
continue
2023-06-06 13:22:57 +00:00
else:
# transaction: no category, no account - ignore?
if line.get('amount', Decimal('0.0')) == Decimal('0.0'):
# no amount --> ignore!
2023-06-06 13:22:57 +00:00
tr_info = {'trdate': '-', 'amount': '-'}
if 'date' in transaction.keys():
2023-06-06 13:22:57 +00:00
tr_info['trdate'] = Report.format_date(
transaction['date'], None)
if 'amount' in transaction.keys():
2023-06-06 13:22:57 +00:00
tr_info['amount'] = Report.format_currency(
transaction['amount'],
None,
book.currency)
tr_info['descr'] = transaction.get('description', '-')
msg_list.append(gettext(
'cashbook_dataexchange.msg_ignore_null_booking',
2023-06-06 13:22:57 +00:00
trinfo='%(trdate)s, %(amount)s, %(descr)s' % tr_info,
))
continue
2022-09-01 12:48:04 +00:00
# party
if 'party' in transaction.keys():
2023-06-06 13:22:57 +00:00
(party_id, msg_txt) = cls.get_party_by_name(
transaction['party'])
if party_id is not None:
line['party'] = party_id
2023-06-06 13:22:57 +00:00
else:
fail_cnt += 1
if msg_txt is not None:
msg_list.append(msg_txt)
2022-09-01 12:48:04 +00:00
for x in ['address', 'checknumber']:
if x in transaction.keys():
line['description'] = ', '.join([
line.get('description', ''),
'%s %s' % (
gettext('cashbook_dataexchange.mds_import_%s' % x),
transaction[x]
),
])
split_lines = []
for sp_line in transaction['split']:
2023-06-06 13:22:57 +00:00
(category, account, msg_lst2, fail_cnt2) = \
cls.get_category_account(book, sp_line)
msg_list.extend(msg_lst2)
if fail_cnt2 > 0:
fail_cnt += fail_cnt2
continue
2022-09-01 12:48:04 +00:00
split_line = {
2023-06-06 13:22:57 +00:00
'amount': sp_line['amount']
if line['bookingtype'].endswith('in')
else sp_line['amount'].copy_negate(),
'description': updt_description(
sp_line.get('description', None)),
}
2022-09-01 12:48:04 +00:00
if category:
# category match to bookingtype?
2023-06-06 13:22:57 +00:00
if ((category.cattype == 'in') and
line['bookingtype'].endswith('out')) or \
((category.cattype == 'out') and
line['bookingtype'].endswith('in')):
msg_list.append(gettext(
2023-06-06 13:22:57 +00:00
'cashbook_dataexchange.' +
'mds_import_category_not_match',
catname='%s [%s]' % (
category.rec_name, category.cattype),
bktype=line['bookingtype'],
data=str(transaction)))
fail_cnt += 1
continue
split_line['splittype'] = 'cat'
split_line['category'] = category.id
elif account:
split_line['splittype'] = 'tr'
split_line['booktransf'] = account.id
2023-06-06 13:22:57 +00:00
else:
continue
split_lines.append(split_line)
2022-09-01 12:48:04 +00:00
if len(split_lines) > 0:
line['splitlines'] = [('create', split_lines)]
2023-06-06 13:22:57 +00:00
if split2edit is True:
2022-09-01 12:48:04 +00:00
if 'splitlines' in line.keys():
line['state'] = 'edit'
# check data
if line['bookingtype'] in ['in', 'out']:
if line.get('category', None) is None:
msg_list.append(gettext(
'cashbook_dataexchange.mds_import_no_category',
2023-06-06 13:22:57 +00:00
trdata=str(transaction)))
fail_cnt += 1
elif line['bookingtype'] in ['mvin', 'mvout']:
if line.get('booktransf', None) is None:
msg_list.append(gettext(
'cashbook_dataexchange.mds_import_no_account',
2023-06-06 13:22:57 +00:00
trdata=str(transaction)))
fail_cnt += 1
2022-09-01 12:48:04 +00:00
to_create.append(line)
return (to_create, msg_list, fail_cnt)
2022-09-01 12:48:04 +00:00
@classmethod
def qif_read_categories(cls, catdata):
""" read categories from text
result: {
'in': [{
'<Category-Name>': {
'type': 'in|out',
'childs': [...],
},
},...],
'out': [{},...],
}
"""
def add_category(catdict, namelst, ctype):
""" add category to dict
"""
if not namelst[0] in catdict.keys():
catdict[namelst[0]] = {'type': ctype, 'childs': {}}
if len(namelst) > 1:
catdict[namelst[0]]['childs'] = add_category(
catdict[namelst[0]]['childs'],
namelst[1:],
ctype)
return catdict
categories = {'in': {}, 'out': {}}
for cattxt in catdata.split('^'):
if len(cattxt.strip()) == 0:
continue
catname = None
cattype = None
for line in cattxt.strip().split('\n'):
if line.startswith('N'):
catname = line[1:].strip().split(':')
elif line.startswith('E'):
cattype = 'out'
elif line.startswith('I'):
cattype = 'in'
2023-06-06 13:22:57 +00:00
else:
raise ValueError('invalid line: %s (%s)' % (line, cattxt))
2023-06-06 13:22:57 +00:00
categories[cattype] = add_category(
categories[cattype], catname, cattype)
return categories
2022-08-31 15:32:01 +00:00
@classmethod
def qif_export_category(cls, record):
""" export single category as qif
"""
return '\n'.join([
'N%(cname)s' % {
'cname': record.rec_name.replace('/', ':'),
},
'E' if record.cattype == 'out' else 'I',
'^',
])
# end QifTool