document_incoming_invoice_xml/document.py

925 lines
36 KiB
Python
Raw Normal View History

2025-01-06 16:52:48 +00:00
# -*- coding: utf-8 -*-
# This file is part of the document-incoming-invoice-xml-module
# from m-ds for Tryton. The COPYRIGHT file at the top level of
# this repository contains the full copyright notices and license terms.
# https://portal3.gefeg.com/projectdata/invoice/deliverables/installed/publishingproject/xrechnung%20%28german%20cius%29/xrechnung_crossindustryinvoice%3B%202017-10-18.scm/html/021.htm?https://portal3.gefeg.com/projectdata/invoice/deliverables/installed/publishingproject/xrechnung%20%28german%20cius%29/xrechnung_crossindustryinvoice%3B%202017-10-18.scm/html/025.htm
import os.path
2025-01-15 15:26:01 +00:00
import json
2025-01-17 11:53:11 +00:00
import copy
2025-01-06 16:52:48 +00:00
from lxml import etree
2025-01-10 16:25:05 +00:00
from datetime import datetime, date
2025-01-09 16:01:34 +00:00
from decimal import Decimal
from trytond.pool import PoolMeta, Pool
2025-01-15 15:26:01 +00:00
from trytond.report import Report
2025-01-06 16:52:48 +00:00
from trytond.transaction import Transaction
from trytond.exceptions import UserError
from trytond.i18n import gettext
from trytond.model import fields
from trytond.pyson import Eval
2025-01-15 15:26:01 +00:00
from trytond.protocols.jsonrpc import JSONEncoder
2025-01-06 16:52:48 +00:00
xml_types = [
(['xsd', 'Factur-X_1.07.2_EXTENDED', 'Factur-X_1.07.2_EXTENDED.xsd'],
'Factur-X extended', 'facturx_extended'),
(['xsd', 'Factur-X_1.07.2_EN16931', 'Factur-X_1.07.2_EN16931.xsd'],
'Factur-X EN16931', ''),
(['xsd', 'Factur-X_1.07.2_BASIC', 'Factur-X_1.07.2_BASIC.xsd'],
'Factur-X basic', ''),
(['xsd', 'Factur-X_1.07.2_BASICWL', 'Factur-X_1.07.2_BASICWL.xsd'],
'Factur-X basic-wl', ''),
(['xsd', '/Factur-X_1.07.2_MINIMUM', 'Factur-X_1.07.2_MINIMUM.xsd'],
'Factur-X minimum', ''),
(['xsd', 'CII D22B XSD', 'CrossIndustryInvoice_100pD22B.xsd'],
'CrossIndustryInvoice D22', ''),
(['xsd', 'os-UBL-2.1', 'xsd/maindoc', 'UBL-Invoice-2.1.xsd'],
'XRechnung - Invoice', ''),
(['xsd', 'os-UBL-2.1', 'xsd', 'maindoc', 'UBL-CreditNote-2.1.xsd'],
'XRechnung - Credit Note', ''),
(['xsd', 'os-UBL-2.1', 'xsd', 'maindoc', 'UBL-DebitNote-2.1.xsd'],
'XRechnung - Debit Note', '')]
class Incoming(metaclass=PoolMeta):
__name__ = 'document.incoming'
xsd_type = fields.Char(
string='XML Content', readonly=True,
states={'invisible': Eval('mime_type', '') != 'application/xml'})
@classmethod
def default_company(cls):
return Transaction().context.get('company')
def _process_supplier_invoice(self):
""" try to detect content of 'data', read values
"""
invoice = super()._process_supplier_invoice()
self.xsd_type = None
if self.mime_type == 'application/xml':
# detect xml-content
xml_info = self._facturx_detect_content()
if xml_info:
(xsd_type, funcname, xmltree) = xml_info
self.xsd_type = xsd_type
xml_read_func = getattr(self, '_readxml_%s' % funcname, None)
if not xml_read_func:
raise UserError(gettext(
'document_incoming_invoice_xml.msg_not_implemented',
xmltype=xsd_type))
2025-01-17 11:12:13 +00:00
# read xml data, write to 'self.parsed_data'
xml_read_func(xmltree)
# update invoice with imported data
invoice = self._readxml_update_invoice(invoice)
2025-01-06 16:52:48 +00:00
return invoice
2025-01-09 16:01:34 +00:00
def _readxml_xpath(self, tags):
""" generate xpath
Args:
tags (list): list of string or integer to build path
"""
parts = []
for x in tags:
if isinstance(x, str):
parts.append(x)
elif isinstance(x, int):
if parts[-1].endswith(']'):
raise ValueError('multiple list selector')
parts[-1] += '[%d]' % x
result = '/' + '/'.join(parts)
return result
2025-01-06 16:52:48 +00:00
def _readxml_getvalue(
2025-01-09 16:01:34 +00:00
self, xmltree, tags, vtype=None, allow_list=False):
2025-01-06 16:52:48 +00:00
""" read 'text'-part from xml-xpath,
convert to 'vtype'
Args:
tags (list): list of tags to build xpath
2025-01-07 10:23:54 +00:00
vtype (type-class, optional): to convert value of text-part
2025-01-09 16:01:34 +00:00
Defaults to None.
2025-01-07 10:23:54 +00:00
allow_list (boolean, optional): get result as list of values,
Defaults to False.
2025-01-06 16:52:48 +00:00
Returns:
various: converted value or None
"""
result = []
2025-01-09 16:01:34 +00:00
xpath = self._readxml_xpath(tags)
2025-01-06 16:52:48 +00:00
nodes = xmltree.xpath(xpath, namespaces=xmltree.nsmap)
2025-01-07 10:23:54 +00:00
2025-01-06 16:52:48 +00:00
if nodes:
2025-01-07 10:23:54 +00:00
for node1 in nodes:
2025-01-09 16:01:34 +00:00
result.append(
2025-01-10 16:25:05 +00:00
node1.text if vtype is None
else self._readxml_convertdate(node1.text) if vtype == date
else vtype(node1.text))
2025-01-07 10:23:54 +00:00
if not allow_list:
break
2025-01-06 16:52:48 +00:00
if not allow_list:
if result:
return result[0]
return None
2025-01-06 16:52:48 +00:00
return result
def _readxml_getattrib(self, xmltree, tags, attrib, vtype=None):
""" read attribute from xml-xpath,
convert to 'vtype'
Args:
tags (list): list fo tags to build xpath
attrib (str): name of attribute to read
vtype (type-class, optional): to convert value. Defaults to None.
Returns:
various: converted value or None
"""
result = None
2025-01-09 16:01:34 +00:00
xpath = self._readxml_xpath(tags)
2025-01-06 16:52:48 +00:00
nodes = xmltree.xpath(xpath, namespaces=xmltree.nsmap)
if nodes:
result = nodes[0].attrib.get(attrib, None)
if result and vtype:
result = vtype(result)
return result
def _readxml_find_party(self, party_data):
""" find party by search with data from xml-file
Args:
party_data (dict): data of party read from xml
Returns:
int: id of party or None
"""
pool = Pool()
Party = pool.get('party.party')
Address = pool.get('party.address')
if party_data['name']:
query = [(
'name', 'ilike',
'%%%(name)s%%' % {'name': party_data['name']})]
if len(set({'postal_code', 'street', 'city'}).intersection(
set(party_data.keys()))) == 3:
# ignore capitalization
address_sql = Address.search([
('city', 'ilike', party_data['city']),
('postal_code', '=', party_data['postal_code']),
('street', 'ilike',
party_data['street'].split('\n')[0] + '%')],
query=True)
query.append(('addresses', 'in', address_sql))
party = Party.search(query)
if party:
# we use this party if its a exact match
if len(party) == 1:
return party[0].id
return None
def _readxml_create_party(self, partydata):
""" create party with data from xml
Args:
partydata (dict): data of party, read from xml
Returns:
int: id of created party
"""
pool = Pool()
Party = pool.get('party.party')
Invoice = pool.get('account.invoice')
Company = pool.get('company.company')
if not partydata['name']:
return None
to_create = {'name': partydata['name']}
address = {
x: partydata[x]
for x in ['street', 'postal_code', 'city', 'country',
'subdivision']
if x in partydata.keys()}
# if no country, we use the company-country
if 'country' not in address.keys():
company = Invoice.default_company()
if company:
company = Company(company)
company_address = company.party.address_get()
if company_address and company_address.country:
address['country'] = company_address.country.id
if address:
to_create['addresses'] = [('create', [address])]
party, = Party.create([to_create])
return party.id
def _readxml_party_data(self, xmltree, tags, create_party=False):
""" read party data
Args:
xmltree (xmldata): XML-tree
tags (list): tags to build xpath
create_party (boolean, optional): create party if not found
Returns:
dict: data of party
"""
pool = Pool()
Country = pool.get('country.country')
SubDivision = pool.get('country.subdivision')
result = {}
# name
result['name'] = self._readxml_getvalue(
xmltree, tags + ['ram:Name'])
result['postal_code'] = self._readxml_getvalue(
xmltree, tags + ['ram:PostalTradeAddress', 'ram:PostcodeCode'])
if not result['postal_code']:
del result['postal_code']
# address, max. 3 lines
result['street'] = [self._readxml_getvalue(
xmltree, tags + ['ram:PostalTradeAddress', 'ram:LineOne'])]
result['street'].append(self._readxml_getvalue(
xmltree, tags + ['ram:PostalTradeAddress', 'ram:LineTwo']))
result['street'].append(self._readxml_getvalue(
xmltree, tags + ['ram:PostalTradeAddress', 'ram:LineThree']))
result['street'] = '\n'.join([x for x in result['street'] if x])
if not result['street']:
del result['street']
# city
result['city'] = self._readxml_getvalue(
xmltree, tags + ['ram:PostalTradeAddress', 'ram:CityName'])
if not result['city']:
del result['city']
# country
country_code = self._readxml_getvalue(
xmltree,
tags + ['ram:PostalTradeAddress', 'ram:CountryID'])
if country_code:
country = Country.search([('code', '=', country_code.upper())])
if country:
result['country'] = country[0].id
# subdivision
subdivision = self._readxml_getvalue(
xmltree,
tags + ['ram:PostalTradeAddress', 'ram:CountrySubDivisionName'])
if subdivision and ('country' in result.keys()):
subdiv = SubDivision.search([
('name', '=', subdivision),
('country', '=', result['country'])])
if subdiv:
result['subdivision'] = subdiv[0].id
party_id = self._readxml_find_party(result)
if party_id:
result['party'] = party_id
else:
if create_party:
party_id = self._readxml_create_party(result)
if party_id:
result['party'] = party_id
return result
2025-01-17 11:12:13 +00:00
def _readxml_update_invoice(self, invoice):
""" update invoice with parsed_data
Args:
invoice (record): model account.invoice
Returns:
record: model account.invoice
"""
Configuration = Pool().get('document.incoming.configuration')
config = Configuration.get_singleton()
if config and config.number_target == 'number':
invoice.number = self.parsed_data.get('invoice_number', None)
else:
invoice.reference = self.parsed_data.get('invoice_number', None)
invoice.invoice_date = self.parsed_data.get('invoice_date', None)
note_list = self.parsed_data.get('note_list', None)
if note_list:
invoice.description = note_list[0].get('Content', None)
invoice.comment = '\n'.join([
'%(code)s%(subj)s%(msg)s' % {
'code': ('Code=%s, ' % x.get('ContentCode', ''))
if x.get('ContentCode', '') else '',
'subj': ('Subject=%s, ' % x.get('SubjectCode', ''))
if x.get('SubjectCode', '') else '',
'msg': x.get('Content', ''),
} for x in note_list[1:]])
seller_party = self.parsed_data.get('seller_party', None)
if seller_party:
if 'party' in seller_party.keys():
invoice.party = seller_party['party']
invoice.on_change_party()
else:
raise UserError(gettext(
'document_incoming_invoice_xml.msg_no_supplierparty',
partytxt=', '.join([
seller_party[x].replace('\n', '; ')
for x in seller_party.keys()])))
# check if we found our company
buyer_party = self.parsed_data.get('buyer_party', None)
if buyer_party and config and not config.accept_other_company:
company_party_id = self._readxml_find_party(buyer_party)
if not (company_party_id and
(company_party_id == self.company.party.id)):
raise UserError(gettext(
'document_incoming_invoice_xml.msg_not_our_company',
partytxt=', '.join([
buyer_party[x].replace('\n', '; ')
for x in buyer_party.keys()])))
lines_data = copy.deepcopy(self.parsed_data.get('lines_data', None))
if lines_data:
lines = [
self._readxml_getinvoiceline(invoice, x)
for x in lines_data]
for x in range(len(lines)):
lines[x].sequence = x + 1
invoice.lines = lines
invoice.on_change_lines()
print('\n## parsed-data:', self.parsed_data)
return invoice
def _readxml_facturx_extended(self, xmltree):
2025-01-06 16:52:48 +00:00
""" read factur-x extended
Args:
invoice (record): model account.invoice
Returns:
record: model account.invoice
"""
Configuration = Pool().get('document.incoming.configuration')
config = Configuration.get_singleton()
2025-01-17 11:12:13 +00:00
if not hasattr(self, 'parsed_data'):
self.parsed_data = {}
if not isinstance(self.parsed_data, dict):
self.parsed_data = {}
2025-01-09 16:01:34 +00:00
# rsm:CrossIndustryInvoice
xpath_cross_ind = ['rsm:CrossIndustryInvoice']
# rsm:CrossIndustryInvoice/rsm:ExchangedDocument
xpath_exchg_doc = xpath_cross_ind + ['rsm:ExchangedDocument']
2025-01-06 16:52:48 +00:00
# check invoice-type:
# allowed codes 380 incoice, 381 credit note
2025-01-09 16:01:34 +00:00
inv_code = self._readxml_getvalue(
xmltree, xpath_exchg_doc + ['ram:TypeCode'], int)
2025-01-06 16:52:48 +00:00
if inv_code not in [380, 381]:
raise UserError(gettext(
'document_incoming_invoice_xml.msg_convert_error',
msg='invalid type-code: %(code)s (expect: 380, 381)' % {
'code': str(inv_code)}))
2025-01-17 11:12:13 +00:00
self.parsed_data['invoice_number'] = self._readxml_getvalue(
2025-01-09 16:01:34 +00:00
xmltree, xpath_exchg_doc + ['ram:ID'])
2025-01-06 16:52:48 +00:00
# invoice-date
2025-01-09 16:01:34 +00:00
date_path = xpath_exchg_doc + [
2025-01-06 16:52:48 +00:00
'ram:IssueDateTime', 'udt:DateTimeString']
date_format = self._readxml_getattrib(xmltree, date_path, 'format')
if date_format != '102':
raise UserError(gettext(
'document_incoming_invoice_xml.msg_convert_error',
msg='invalid date-format: %(code)s (expect: 102)' % {
'code': str(date_format)}))
2025-01-17 11:12:13 +00:00
self.parsed_data['invoice_date'] = self._readxml_convertdate(
2025-01-06 16:52:48 +00:00
self._readxml_getvalue(xmltree, date_path))
# IncludedNote, 1st line --> 'description', 2nd ff. --> 'comment'
2025-01-09 16:01:34 +00:00
xpath_notes = xpath_exchg_doc + ['ram:IncludedNote']
# count number of notes
num_nodes = len(xmltree.xpath(
self._readxml_xpath(xpath_notes), namespaces=xmltree.nsmap))
# read notes and their fields
note_list = []
for x in range(1, num_nodes + 1):
note_list.append({
y: self._readxml_getvalue(
xmltree, xpath_notes + [x, 'ram:%s' % y])
for y in ['Content', 'ContentCode', 'SubjectCode']
})
2025-01-06 16:52:48 +00:00
if note_list:
2025-01-17 11:12:13 +00:00
self.parsed_data['note_list'] = note_list
2025-01-09 16:01:34 +00:00
# rsm:CrossIndustryInvoice/rsm:SupplyChainTradeTransaction
xpath_suppl_chain = xpath_cross_ind + [
'rsm:SupplyChainTradeTransaction']
# rsm:CrossIndustryInvoice/
# rsm:SupplyChainTradeTransaction/
# ram:ApplicableHeaderTradeAgreement
xpath_appl_head_agree = xpath_suppl_chain + [
'ram:ApplicableHeaderTradeAgreement']
# supplier party
add_party = config and config.create_supplier
2025-01-09 16:01:34 +00:00
seller_party = self._readxml_party_data(
xmltree, xpath_appl_head_agree + ['ram:SellerTradeParty'],
create_party=add_party)
if seller_party:
2025-01-17 11:12:13 +00:00
self.parsed_data['seller_party'] = seller_party
# company party
2025-01-09 16:01:34 +00:00
buyer_party = self._readxml_party_data(
xmltree, xpath_appl_head_agree + ['ram:BuyerTradeParty'],
create_party=False)
2025-01-17 11:12:13 +00:00
if buyer_party:
self.parsed_data['buyer_party'] = buyer_party
2025-01-07 10:23:54 +00:00
2025-01-09 16:01:34 +00:00
# invoice - lines
# rsm:CrossIndustryInvoice/
# rsm:SupplyChainTradeTransaction/
# ram:IncludedSupplyChainTradeLineItem
xpath_line_item = xpath_suppl_chain + [
'ram:IncludedSupplyChainTradeLineItem']
# get number of invoice-lines
num_lines = len(xmltree.xpath(
self._readxml_xpath(xpath_line_item), namespaces=xmltree.nsmap))
lines_data = []
for x in range(1, num_lines + 1):
lines_data.append(
self._readxml_invoice_line(xmltree, xpath_line_item, x))
2025-01-17 11:12:13 +00:00
if lines_data:
self.parsed_data['lines_data'] = lines_data
2025-01-17 11:13:27 +00:00
# payment data
xpath_payment = xpath_suppl_chain + [
'ram:ApplicableHeaderTradeSettlement']
self.parsed_data['payment'] = {}
self.parsed_data['payment']['reference'] = self._readxml_getvalue(
xmltree, xpath_payment + ['ram:PaymentReference'])
self.parsed_data['payment']['currency'] = self._readxml_getvalue(
xmltree, xpath_payment + ['ram:InvoiceCurrencyCode'])
# num bank accounts
xpath_bankaccounts = xpath_payment + [
'ram:SpecifiedTradeSettlementPaymentMeans']
num_bank = len(xmltree.xpath(
self._readxml_xpath(xpath_bankaccounts), namespaces=xmltree.nsmap))
bank_accounts = []
for x in range(num_bank):
bank_account = {
'info': self._readxml_getvalue(
xmltree, xpath_bankaccounts + [x + 1, 'ram:Information']),
'type': self._readxml_getvalue(
xmltree, xpath_bankaccounts + [x + 1, 'ram:TypeCode']),
# debitor
'debitor_iban': self._readxml_getvalue(
xmltree, xpath_bankaccounts +
[x + 1, 'ram:PayerPartyDebtorFinancialAccount',
'ram:IBANID']),
# creditor
'creditor_iban': self._readxml_getvalue(
xmltree, xpath_bankaccounts +
[x + 1, 'ram:PayeePartyCreditorFinancialAccount',
'ram:IBANID']),
'creditor_name': self._readxml_getvalue(
xmltree, xpath_bankaccounts +
[x + 1, 'ram:PayeePartyCreditorFinancialAccount',
'ram:AccountName']),
# financial card
'card_id': self._readxml_getvalue(
xmltree, xpath_bankaccounts +
[x + 1, 'ram:ApplicableTradeSettlementFinancialCard',
'ram:ID']),
'card_holder_name': self._readxml_getvalue(
xmltree, xpath_bankaccounts +
[x + 1, 'ram:ApplicableTradeSettlementFinancialCard',
'ram:CardholderName']),
# bank name
'institution': self._readxml_getvalue(
xmltree, xpath_bankaccounts +
[x + 1, 'ram:PayeeSpecifiedCreditorFinancialInstitution',
'ram:BICID']),
}
# add to list of bank accounts, skip empty values
bank_accounts.append({
x: bank_account[x]
for x in bank_account.keys()
if bank_account[x]})
if bank_accounts:
self.parsed_data['payment']['bank'] = bank_accounts
2025-01-06 16:52:48 +00:00
2025-01-10 16:25:05 +00:00
def _readxml_getinvoiceline(self, invoice, line_data):
""" create invoice line in memory
Args:
line_data (dict): values from xml to create
invoice line
"""
pool = Pool()
Line = pool.get('account.invoice.line')
Tax = pool.get('account.tax')
2025-01-15 15:26:01 +00:00
Uom = pool.get('product.uom')
2025-01-10 16:25:05 +00:00
Configuration = pool.get('document.incoming.configuration')
2025-01-15 15:26:01 +00:00
ModelData = pool.get('ir.model.data')
2025-01-10 16:25:05 +00:00
cfg1 = Configuration.get_singleton()
def get_tax_by_percent(percent):
""" search for tax by percent of supplier tax
Args:
percent (Decimal): tax percent, 7% --> 0.07
Raises:
UserError: if no product category is configured or
no matching tax was found
Returns:
2025-01-15 15:26:01 +00:00
tuple: (model 'account.tax, model 'product.category')
2025-01-10 16:25:05 +00:00
"""
if not (cfg1 and cfg1.product_category):
raise UserError(gettext(
'document_incoming_invoice_xml.msg_no_prodcat_configured'))
for pcat in cfg1.product_category:
for s_tax in pcat.supplier_taxes:
# get current taxes of supplier-tax at
# invoice-date
current_taxes = Tax.compute(
[s_tax], Decimal('1'), 1.0, invoice.invoice_date)
2025-01-15 15:26:01 +00:00
# skip result of multiple or none taxes
2025-01-10 16:25:05 +00:00
if len(current_taxes) != 1:
continue
if (current_taxes[0]['tax'].rate == percent) and (
current_taxes[0]['tax'].type == 'percentage'):
# found it
2025-01-15 15:26:01 +00:00
return (s_tax, pcat)
2025-01-10 16:25:05 +00:00
# no product category found
raise UserError(gettext(
'document_incoming_invoice_xml.msg_no_prodcat_found',
percent=percent * Decimal('100.0')))
2025-01-15 15:26:01 +00:00
# uom
xml_uom = Uom(ModelData.get_id('product', 'uom_unit'))
for x_attr in line_data.get('attributes', []):
x_uom = x_attr.get('uom', None)
if not x_uom:
continue
units = Uom.search([('symbol', '=', x_uom)])
if units:
xml_uom = units[0]
2025-01-10 16:25:05 +00:00
line = Line(
invoice=invoice,
type='line',
2025-01-15 15:26:01 +00:00
unit=xml_uom,
2025-01-10 16:25:05 +00:00
quantity=line_data.get('quantity', {}).pop('billed', None),
unit_price=line_data.get('unit_net_price', {}).pop('amount', None))
line_no = line_data.pop('line_no', None)
# description
descr = [
'[%s]' % line_no if line_no else None,
line_data.pop('name', None),
line_data.pop('description', None)]
line.description = '; '.join([x for x in descr if x])
2025-01-15 15:26:01 +00:00
# get taxes and product-category from settings
tax_and_category = []
line_taxes = line_data.pop('taxes', [])
2025-01-10 16:25:05 +00:00
for x in line_taxes:
percent = x.get('percent', None)
if (x.get('type', '') == 'VAT') and (percent is not None):
percent = percent / Decimal('100')
2025-01-15 15:26:01 +00:00
tax_and_category.append(get_tax_by_percent(percent))
2025-01-10 16:25:05 +00:00
# check result
2025-01-15 15:26:01 +00:00
if len(line_taxes) != len(tax_and_category):
2025-01-10 16:25:05 +00:00
raise UserError(gettext(
'document_incoming_invoice_xml.msg_numtaxes_invalid',
descr=line.description,
taxin='|'.join([
str(x.get('percent', '-')) for x in line_taxes]),
taxout='|'.join([
2025-01-15 15:26:01 +00:00
str(x[0].rate * Decimal('100.0'))
for x in tax_and_category])))
# set taxes and account for product
line.taxes = [x[0] for x in tax_and_category]
expense_accounts = [
x[1].account_expense
for x in tax_and_category if x[1].account_expense]
if expense_accounts:
line.account = expense_accounts[0]
# check if calculated 'amount' matches with amount from xml-data
2025-01-17 11:53:11 +00:00
assert invoice.currency is not None
2025-01-15 15:26:01 +00:00
xml_amount = line_data.get('total', {}).pop('amount', None)
if xml_amount is not None:
if xml_amount != line.get_amount(None):
raise UserError(gettext(
'document_incoming_invoice_xml.msg_line_amount_invalid',
linetxt=line.description,
calcsum=Report.format_currency(
line.get_amount(None), None, invoice.currency),
xmlsum=Report.format_currency(
xml_amount, None, invoice.currency)))
2025-01-10 16:25:05 +00:00
# cleanup used values
2025-01-15 15:26:01 +00:00
for x in ['quantity', 'unit_net_price', 'total']:
2025-01-10 16:25:05 +00:00
if not line_data.get(x, {}):
del line_data[x]
2025-01-15 15:26:01 +00:00
# note of line
notes = [line_data.pop('line_note', None)]
# skipped xml-data
convert_notes = line_data.pop('convert_note', None)
if convert_notes:
notes.extend(
[' '] +
[gettext('document_incoming_invoice_xml.msg_convert_note')] +
convert_notes
if isinstance(convert_notes, list) else [str(convert_notes)])
# values not used to create invoice-line
if line_data:
notes.extend([
' ',
gettext(
'document_incoming_invoice_xml.msg_unused_linevalues'),
json.dumps(line_data, cls=JSONEncoder, indent=3)])
line.note = '\n'.join(x for x in notes if x)
line.on_change_invoice()
2025-01-10 16:25:05 +00:00
return line
2025-01-17 11:53:11 +00:00
def _readxml_read_listdata(self, xmldata, xpth, xtag, key_list):
""" read list of values from xml
Args:
xmldata (xtree): xml-tree
xpth (list): list of xml-tags until index-counter
pos (int): position in list
xtag (str): xml-tag to read as list
key_list (list): [('<xtag>', '<key>')]
Returns:
list: list of dict
"""
if isinstance(xtag, str):
xtag = [xtag]
xpath_list = xpth + xtag
num_listitem = len(xmldata.xpath(
self._readxml_xpath(xpath_list), namespaces=xmldata.nsmap))
listdata = []
for x in range(1, num_listitem + 1):
values = {}
for list_item in key_list:
(xkey, key, xtype) = (
list_item if len(list_item) == 3
else tuple(list(list_item) + [None]))
value = self._readxml_getvalue(
xmldata, xpath_list + [x, xkey], vtype=xtype)
if value is not None:
values[key] = value
if values:
listdata.append(values)
return listdata
2025-01-09 16:01:34 +00:00
def _readxml_invoice_line(self, xmldata, xpth, pos):
""" read invoice-line from xml
Args:
xmldata (): xml-tree object
xpth (list): xpath to invoice-line
pos (int): number of line to read
Returns:
dict: invoice line data
"""
2025-01-17 11:53:11 +00:00
xpath_line = xpth + [pos]
2025-01-09 16:01:34 +00:00
result = {'convert_note': []}
2025-01-17 11:53:11 +00:00
result['line_no'] = self._readxml_getvalue(xmldata, xpath_line + [
2025-01-09 16:01:34 +00:00
'ram:AssociatedDocumentLineDocument', 'ram:LineID'])
result['line_note'] = '\n'.join(self._readxml_getvalue(
2025-01-17 11:53:11 +00:00
xmldata, xpath_line + [
2025-01-09 16:01:34 +00:00
'ram:AssociatedDocumentLineDocument', 'ram:IncludedNote',
'ram:Content'], allow_list=True))
for xkey, key in [
('ram:ID', 'prod_id'),
('ram:GlobalID', 'glob_id'),
('ram:SellerAssignedID', 'seller_id'),
('ram:BuyerAssignedID', 'buyer_id'),
('ram:IndustryAssignedID', 'industy_id'),
('ram:ModelID', 'model_id'),
('ram:Name', 'name'),
('ram:Description', 'description'),
('ram:BatchID', 'lot'),
('ram:BrandName', 'brand_name'),
('ram:ModelName', 'model_name'),
('ram:OriginTradeCountry', 'trade_country')
]:
2025-01-17 11:53:11 +00:00
result[key] = self._readxml_getvalue(xmldata, xpath_line + [
2025-01-09 16:01:34 +00:00
'ram:SpecifiedTradeProduct', xkey])
# attributes of product
2025-01-17 11:53:11 +00:00
result['attributes'] = self._readxml_read_listdata(
xmldata, xpath_line, [
'ram:SpecifiedTradeProduct',
'ram:ApplicableProductCharacteristic'], [
2025-01-09 16:01:34 +00:00
('ram:TypeCode', 'code'),
('ram:Description', 'description'),
('ram:ValueMeasure', 'uom'),
('ram:ValueY', 'value')])
# classification of product
2025-01-17 11:53:11 +00:00
result['classification'] = self._readxml_read_listdata(
xmldata, xpath_line, [
'ram:SpecifiedTradeProduct',
'ram:DesignatedProductClassification'], [
2025-01-09 16:01:34 +00:00
('ram:ClassCode', 'code'),
('ram:ClassName', 'name')])
# serial-numbers of product
2025-01-17 11:53:11 +00:00
result['serialno'] = self._readxml_read_listdata(
xmldata, xpath_line, [
'ram:SpecifiedTradeProduct',
'ram:IndividualTradeProductInstance'], [
2025-01-09 16:01:34 +00:00
('ram:BatchID', 'lot'),
('ram:SupplierAssignedSerialID', 'serial')])
# referenced product
2025-01-17 11:53:11 +00:00
result['refprod'] = self._readxml_read_listdata(
xmldata, xpath_line,
2025-01-09 16:01:34 +00:00
['ram:SpecifiedTradeProduct', 'ram:IncludedReferencedProduct'], [
('ram:ID', 'id'),
('ram:GlobalID', 'global_id'),
('ram:SellerAssignedID', 'seller_id'),
('ram:BuyerAssignedID', 'buyer_id'),
('ram:Name', 'name'),
('ram:Description', 'description'),
('ram:UnitQuantity', 'quantity', Decimal),
])
# net price
2025-01-17 11:53:11 +00:00
xpath_netprice = xpath_line + [
2025-01-09 16:01:34 +00:00
'ram:SpecifiedLineTradeAgreement', 'ram:NetPriceProductTradePrice']
if self._readxml_getvalue(xmldata, xpath_netprice):
2025-01-17 11:53:11 +00:00
result['unit_net_price'] = self._readxml_read_listdata(
xmldata, xpath_line, [
'ram:SpecifiedLineTradeAgreement',
'ram:NetPriceProductTradePrice'], [
2025-01-09 16:01:34 +00:00
('ram:ChargeAmount', 'amount', Decimal),
2025-01-10 16:25:05 +00:00
('ram:BasisQuantity', 'basequantity', Decimal)])[0]
2025-01-09 16:01:34 +00:00
# notice ignored field
if self._readxml_getvalue(xmldata, xpath_netprice + [
'ram:AppliedTradeAllowanceCharge']):
result['convert_note'].append(
'skip: ' + self._readxml_xpath(
xpath_netprice + ['ram:AppliedTradeAllowanceCharge']))
# gross price
2025-01-17 11:53:11 +00:00
xpath_grossprice = xpath_line + [
2025-01-09 16:01:34 +00:00
'ram:SpecifiedLineTradeAgreement',
'ram:GrossPriceProductTradePrice']
if self._readxml_getvalue(xmldata, xpath_grossprice):
2025-01-17 11:53:11 +00:00
result['unit_gross_price'] = self._readxml_read_listdata(
xmldata, xpath_line, [
'ram:SpecifiedLineTradeAgreement',
'ram:GrossPriceProductTradePrice'], [
2025-01-09 16:01:34 +00:00
('ram:ChargeAmount', 'amount', Decimal),
2025-01-10 16:25:05 +00:00
('ram:BasisQuantity', 'basequantity', Decimal)])[0]
2025-01-09 16:01:34 +00:00
# notice ignored field
if self._readxml_getvalue(xmldata, xpath_grossprice + [
'ram:AppliedTradeAllowanceCharge']):
result['convert_note'].append(
'skip: ' + self._readxml_xpath(
2025-01-10 16:25:05 +00:00
xpath_grossprice +
['ram:AppliedTradeAllowanceCharge']))
2025-01-09 16:01:34 +00:00
# quantity
2025-01-17 11:53:11 +00:00
xpath_quantity = xpath_line + ['ram:SpecifiedLineTradeDelivery']
result['quantity'] = self._readxml_read_listdata(
xmldata, xpath_line, [
'ram:SpecifiedLineTradeDelivery'], [
2025-01-09 16:01:34 +00:00
('ram:BilledQuantity', 'billed', Decimal),
('ram:ChargeFreeQuantity', 'chargefree', Decimal),
('ram:PackageQuantity', 'package', Decimal),
2025-01-10 16:25:05 +00:00
])[0]
2025-01-09 16:01:34 +00:00
# notice ignored fields
for x in [
'ShipToTradeParty', 'UltimateShipToTradeParty',
'ActualDeliverySupplyChainEvent',
'DespatchAdviceReferencedDocument',
'ReceivingAdviceReferencedDocument',
'DeliveryNoteReferencedDocument']:
xp_to_check = xpath_quantity + ['ram:' + x]
if self._readxml_getvalue(xmldata, xp_to_check):
result['convert_note'].append(
'skip: ' + self._readxml_xpath(xp_to_check))
# taxes
2025-01-17 11:53:11 +00:00
xpath_trade = xpath_line + ['ram:SpecifiedLineTradeSettlement']
result['taxes'] = self._readxml_read_listdata(xmldata, xpath_line, [
2025-01-10 16:25:05 +00:00
'ram:SpecifiedLineTradeSettlement',
'ram:ApplicableTradeTax'], [
('ram:CalculatedAmount', 'amount', Decimal),
('ram:TypeCode', 'type'),
('ram:ExemptionReason', 'reason'),
('ram:BasisAmount', 'base', Decimal),
('ram:LineTotalBasisAmount', 'basetotal', Decimal),
('ram:AllowanceChargeBasisAmount', 'feebase', Decimal),
('ram:CategoryCode', 'category_code'),
('ram:ExemptionReasonCode', 'reason_code'),
('ram:TaxPointDate', 'taxdate', date),
('ram:DueDateTypeCode', 'duecode'),
('ram:RateApplicablePercent', 'percent', Decimal)])
# total amounts
2025-01-17 11:53:11 +00:00
result['total'] = self._readxml_read_listdata(xmldata, xpath_line, [
2025-01-10 16:25:05 +00:00
'ram:SpecifiedLineTradeSettlement',
'ram:SpecifiedTradeSettlementLineMonetarySummation'], [
('ram:LineTotalAmount', 'amount', Decimal),
('ram:ChargeTotalAmount', 'charge', Decimal),
('ram:AllowanceTotalAmount', 'fee', Decimal),
('ram:TaxTotalAmount', 'tax', Decimal),
('ram:GrandTotalAmount', 'grand', Decimal),
('ram:TotalAllowanceChargeAmount', 'feecharge', Decimal),
2025-01-15 15:26:01 +00:00
])[0]
2025-01-10 16:25:05 +00:00
# notice ignored fields
for x in [
'BillingSpecifiedPeriod', 'SpecifiedTradeAllowanceCharge',
'ActualDeliverySupplyChainEvent',
'InvoiceReferencedDocument',
'AdditionalReferencedDocument',
'ReceivableSpecifiedTradeAccountingAccount']:
xp_to_check = xpath_trade + ['ram:' + x]
if self._readxml_getvalue(xmldata, xp_to_check):
result['convert_note'].append(
'skip: ' + self._readxml_xpath(xp_to_check))
2025-01-09 16:01:34 +00:00
# skip None values
return {x: result[x] for x in result.keys() if result[x]}
2025-01-06 16:52:48 +00:00
def _readxml_convertdate(self, date_string):
""" convert date-string to python-date
Args:
date_string (_type_): _description_
"""
if not date_string:
return None
try:
return datetime.strptime(date_string, '%Y%m%d').date()
except ValueError:
raise UserError(gettext(
'document_incoming_invoice_xml.msg_convert_error',
msg='cannot convert %(val)s' % {
'val': date_string}))
def _facturx_detect_content(self):
""" check xml-data against xsd of XRechnung and FacturX,
begin with extended, goto minimal, then xrechnung
Returns:
tuple: ('xsd tyle', '<function to read xml>', <xml etree>)
defaults to None if not detected
"""
xml_data = etree.fromstring(self.data)
for xsdpath, xsdtype, funcname in xml_types:
fname = os.path.join(*[os.path.split(__file__)[0]] + xsdpath)
schema = etree.XMLSchema(etree.parse(fname))
try:
schema.assertValid(xml_data)
except etree.DocumentInvalid:
pass
return (xsdtype, funcname, xml_data)
return None
# end Incoming