document_incoming_invoice_xml/document.py
Frederik Jaeckel c25ad51d75 add pdf-import
2025-01-22 18:31:40 +01:00

1089 lines
43 KiB
Python

# -*- coding: utf-8 -*-
# This file is part of the document-incoming-invoice-xml-module
# from m-ds for Tryton. The COPYRIGHT file at the top level of
# this repository contains the full copyright notices and license terms.
# https://portal3.gefeg.com/projectdata/invoice/deliverables/installed/publishingproject/xrechnung%20%28german%20cius%29/xrechnung_crossindustryinvoice%3B%202017-10-18.scm/html/021.htm?https://portal3.gefeg.com/projectdata/invoice/deliverables/installed/publishingproject/xrechnung%20%28german%20cius%29/xrechnung_crossindustryinvoice%3B%202017-10-18.scm/html/025.htm
import os.path
import json
import copy
import facturx
from lxml import etree
from datetime import datetime, date
from decimal import Decimal
from trytond.pool import PoolMeta, Pool
from trytond.report import Report
from trytond.transaction import Transaction
from trytond.exceptions import UserError
from trytond.i18n import gettext
from trytond.model import fields
from trytond.pyson import Eval
from trytond.protocols.jsonrpc import JSONEncoder
xml_types = [
(['xsd', 'Factur-X_1.07.2_MINIMUM', 'Factur-X_1.07.2_MINIMUM.xsd'],
'Factur-X minimum', 'facturx_minimal'),
(['xsd', 'Factur-X_1.07.2_BASIC', 'Factur-X_1.07.2_BASIC.xsd'],
'Factur-X basic', 'facturx_basic'),
(['xsd', 'Factur-X_1.07.2_BASICWL', 'Factur-X_1.07.2_BASICWL.xsd'],
'Factur-X basicwl', 'facturx_basicwl'),
(['xsd', 'Factur-X_1.07.2_EN16931', 'Factur-X_1.07.2_EN16931.xsd'],
'Factur-X EN16931', 'facturx_en16931'),
(['xsd', 'Factur-X_1.07.2_EXTENDED', 'Factur-X_1.07.2_EXTENDED.xsd'],
'Factur-X extended', 'facturx_extended'),
(['xsd', 'CII D22B XSD', 'CrossIndustryInvoice_100pD22B.xsd'],
'CrossIndustryInvoice D22', ''),
(['xsd', 'os-UBL-2.1', 'xsd/maindoc', 'UBL-Invoice-2.1.xsd'],
'XRechnung - Invoice', ''),
(['xsd', 'os-UBL-2.1', 'xsd', 'maindoc', 'UBL-CreditNote-2.1.xsd'],
'XRechnung - Credit Note', ''),
(['xsd', 'os-UBL-2.1', 'xsd', 'maindoc', 'UBL-DebitNote-2.1.xsd'],
'XRechnung - Debit Note', '')]
class Incoming(metaclass=PoolMeta):
__name__ = 'document.incoming'
xsd_type = fields.Char(
string='Content Data', readonly=True,
states={'invisible': ~Eval('mime_type', '').in_(
['application/xml', 'application/pdf'])})
@classmethod
def default_company(cls):
return Transaction().context.get('company')
@classmethod
def process(cls, documents, with_children=False):
""" check if content can be converted
Args:
documents (list): records of model document.incoming
with_children (bool, optional): convert sub-documents.
Defaults to False.
"""
for document in documents:
document._facturx_detect_content()
super().process(documents, with_children)
def _process_supplier_invoice(self):
""" try to detect content of 'data', read values
"""
invoice = super()._process_supplier_invoice()
if self.mime_type in ['application/xml', 'application/pdf']:
# detect xml-content
xml_info = self._facturx_detect_content()
if xml_info:
(xsd_type, funcname, xmltree) = xml_info
xml_read_func = getattr(self, '_readxml_%s' % funcname, None)
if not xml_read_func:
raise UserError(gettext(
'document_incoming_invoice_xml.msg_not_implemented',
xmltype=xsd_type))
# read xml data, write to 'self.parsed_data'
xml_read_func(xmltree)
# update invoice with imported data
if self.parsed_data:
invoice = self._readxml_update_invoice(invoice)
invoice.save()
self._readxml_check_invoice(invoice)
# raise ValueError('stop')
return invoice
def _readxml_check_invoice(self, invoice):
""" check if calculated totals match with data from xml
Raises:
UserError: if calculated values dont match
with xml-values
"""
totals = self.parsed_data.get('total', None)
if not totals:
raise UserError(gettext(
'msg_convert_error.msg_convert_error',
msg='no totals-section in xml-data'))
for xfield, inv_field in [
('taxbase', 'untaxed_amount'),
('taxtotal', 'tax_amount'),
('grand', 'total_amount')]:
xml_val = totals.get(xfield, Decimal('0.0'))
inv_val = getattr(invoice, inv_field)
if xml_val != inv_val:
raise UserError(gettext(
'document_incoming_invoice_xml.msg_convert_error',
msg=' '.join([
inv_field + ' mismatch',
'from-xml=' + Report.format_currency(
xml_val, None, invoice.currency),
'calculated=' + Report.format_currency(
inv_val, None, invoice.currency)
])))
def _readxml_xpath(self, tags):
""" generate xpath
Args:
tags (list): list of string or integer to build path
"""
parts = []
for x in tags:
if isinstance(x, str):
parts.append(x)
elif isinstance(x, int):
if parts[-1].endswith(']'):
raise ValueError('multiple list selector')
parts[-1] += '[%d]' % x
result = '/' + '/'.join(parts)
return result
def _readxml_getvalue(
self, xmltree, tags, vtype=None, allow_list=False):
""" read 'text'-part from xml-xpath,
convert to 'vtype'
Args:
tags (list): list of tags to build xpath
vtype (type-class, optional): to convert value of text-part
Defaults to None.
allow_list (boolean, optional): get result as list of values,
Defaults to False.
Returns:
various: converted value or None
"""
result = []
xpath = self._readxml_xpath(tags)
nodes = xmltree.xpath(xpath, namespaces=xmltree.nsmap)
if nodes:
for node1 in nodes:
result.append(
node1.text if vtype is None
else self._readxml_convertdate(node1.text) if vtype == date
else vtype(node1.text))
if not allow_list:
break
if not allow_list:
if result:
return result[0]
return None
return result
def _readxml_getattrib(self, xmltree, tags, attrib, vtype=None):
""" read attribute from xml-xpath,
convert to 'vtype'
Args:
tags (list): list fo tags to build xpath
attrib (str): name of attribute to read
vtype (type-class, optional): to convert value. Defaults to None.
Returns:
various: converted value or None
"""
result = None
xpath = self._readxml_xpath(tags)
nodes = xmltree.xpath(xpath, namespaces=xmltree.nsmap)
if nodes:
result = nodes[0].attrib.get(attrib, None)
if result and vtype:
result = vtype(result)
return result
def _readxml_find_party(self, party_data):
""" find party by search with data from xml-file
Args:
party_data (dict): data of party read from xml
Returns:
int: id of party or None
"""
pool = Pool()
Party = pool.get('party.party')
Address = pool.get('party.address')
if party_data['name']:
query = [(
'name', 'ilike',
'%%%(name)s%%' % {'name': party_data['name']})]
if len(set({'postal_code', 'street', 'city'}).intersection(
set(party_data.keys()))) == 3:
# ignore capitalization
address_sql = Address.search([
('city', 'ilike', party_data['city']),
('postal_code', '=', party_data['postal_code']),
('street', 'ilike',
party_data['street'].split('\n')[0] + '%')],
query=True)
query.append(('addresses', 'in', address_sql))
party = Party.search(query)
if party:
# we use this party if its a exact match
if len(party) == 1:
return party[0].id
return None
def _readxml_create_party(self, partydata):
""" create party with data from xml
Args:
partydata (dict): data of party, read from xml
Returns:
int: id of created party
"""
pool = Pool()
Party = pool.get('party.party')
Invoice = pool.get('account.invoice')
Company = pool.get('company.company')
if not partydata['name']:
return None
to_create = {'name': partydata['name']}
address = {
x: partydata[x]
for x in ['street', 'postal_code', 'city', 'country',
'subdivision']
if x in partydata.keys()}
# if no country, we use the company-country
if 'country' not in address.keys():
company = Invoice.default_company()
if company:
company = Company(company)
company_address = company.party.address_get()
if company_address and company_address.country:
address['country'] = company_address.country.id
if address:
to_create['addresses'] = [('create', [address])]
party, = Party.create([to_create])
return party.id
def _readxml_party_data(self, xmltree, tags, create_party=False):
""" read party data
Args:
xmltree (xmldata): XML-tree
tags (list): tags to build xpath
create_party (boolean, optional): create party if not found
Returns:
dict: data of party
"""
pool = Pool()
Country = pool.get('country.country')
SubDivision = pool.get('country.subdivision')
result = {}
# name
result['name'] = self._readxml_getvalue(
xmltree, tags + ['ram:Name'])
result['postal_code'] = self._readxml_getvalue(
xmltree, tags + ['ram:PostalTradeAddress', 'ram:PostcodeCode'])
if not result['postal_code']:
del result['postal_code']
# address, max. 3 lines
result['street'] = [self._readxml_getvalue(
xmltree, tags + ['ram:PostalTradeAddress', 'ram:LineOne'])]
result['street'].append(self._readxml_getvalue(
xmltree, tags + ['ram:PostalTradeAddress', 'ram:LineTwo']))
result['street'].append(self._readxml_getvalue(
xmltree, tags + ['ram:PostalTradeAddress', 'ram:LineThree']))
result['street'] = '\n'.join([x for x in result['street'] if x])
if not result['street']:
del result['street']
# city
result['city'] = self._readxml_getvalue(
xmltree, tags + ['ram:PostalTradeAddress', 'ram:CityName'])
if not result['city']:
del result['city']
# country
country_code = self._readxml_getvalue(
xmltree,
tags + ['ram:PostalTradeAddress', 'ram:CountryID'])
if country_code:
country = Country.search([('code', '=', country_code.upper())])
if country:
result['country'] = country[0].id
# subdivision
subdivision = self._readxml_getvalue(
xmltree,
tags + ['ram:PostalTradeAddress', 'ram:CountrySubDivisionName'])
if subdivision and ('country' in result.keys()):
subdiv = SubDivision.search([
('name', '=', subdivision),
('country', '=', result['country'])])
if subdiv:
result['subdivision'] = subdiv[0].id
party_id = self._readxml_find_party(result)
if party_id:
result['party'] = party_id
else:
if create_party:
party_id = self._readxml_create_party(result)
if party_id:
result['party'] = party_id
return result
def _readxml_update_invoice(self, invoice):
""" update invoice with parsed_data
Args:
invoice (record): model account.invoice
Returns:
record: model account.invoice
"""
Configuration = Pool().get('document.incoming.configuration')
config = Configuration.get_singleton()
if config and config.number_target == 'number':
invoice.number = self.parsed_data.get('invoice_number', None)
else:
invoice.reference = self.parsed_data.get('invoice_number', None)
invoice.invoice_date = self.parsed_data.get('invoice_date', None)
note_list = self.parsed_data.get('note_list', None)
if note_list:
invoice.description = note_list[0].get('Content', None)
invoice.comment = '\n'.join([
'%(code)s%(subj)s%(msg)s' % {
'code': ('Code=%s, ' % x.get('ContentCode', ''))
if x.get('ContentCode', '') else '',
'subj': ('Subject=%s, ' % x.get('SubjectCode', ''))
if x.get('SubjectCode', '') else '',
'msg': x.get('Content', ''),
} for x in note_list[1:]])
seller_party = self.parsed_data.get('seller_party', None)
if seller_party:
if 'party' in seller_party.keys():
invoice.party = seller_party['party']
invoice.on_change_party()
else:
raise UserError(gettext(
'document_incoming_invoice_xml.msg_no_supplierparty',
partytxt=', '.join([
seller_party[x].replace('\n', '; ')
for x in seller_party.keys()
if isinstance(seller_party[x], str)])))
# check if we found our company
buyer_party = self.parsed_data.get('buyer_party', None)
if buyer_party and config and not config.accept_other_company:
company_party_id = self._readxml_find_party(buyer_party)
if not (company_party_id and
(company_party_id == self.company.party.id)):
raise UserError(gettext(
'document_incoming_invoice_xml.msg_not_our_company',
partytxt=', '.join([
buyer_party[x].replace('\n', '; ')
for x in buyer_party.keys()
if isinstance(buyer_party[x], str)])))
lines_data = copy.deepcopy(self.parsed_data.get('lines_data', None))
if lines_data:
lines = [
self._readxml_getinvoiceline(invoice, x)
for x in lines_data]
for x in range(len(lines)):
lines[x].sequence = x + 1
invoice.lines = lines
invoice.on_change_lines()
return invoice
def _readxml_facturx_minimal(self, xmltree):
""" add missing values to conversion-result
"""
# deny usage of factur-x minimal because it contains no tax-info
raise UserError(gettext(
'document_incoming_invoice_xml.msg_convert_error',
msg='factur-x minimal not supported'))
def _readxml_facturx_basic(self, xmltree):
""" read facturx - basic
"""
self._readxml_facturx_extended(xmltree)
def _readxml_facturx_basicwl(self, xmltree):
""" read facturx - basic-wl
"""
# deny usage of factur-x basicwl because it contains no invoice-lines
raise UserError(gettext(
'document_incoming_invoice_xml.msg_convert_error',
msg='factur-x basic-wl not supported'))
def _readxml_facturx_en16931(self, xmltree):
""" read facturx - EN16931
"""
self._readxml_facturx_extended(xmltree)
def _readxml_facturx_extended(self, xmltree):
""" read factur-x extended
"""
Configuration = Pool().get('document.incoming.configuration')
config = Configuration.get_singleton()
p_data = {}
if hasattr(self, 'parsed_data'):
if isinstance(self.parsed_data, dict):
p_data.update(self.parsed_data)
# rsm:CrossIndustryInvoice
xpath_cross_ind = ['rsm:CrossIndustryInvoice']
# rsm:CrossIndustryInvoice/rsm:ExchangedDocument
xpath_exchg_doc = xpath_cross_ind + ['rsm:ExchangedDocument']
# check invoice-type:
# allowed codes 380 incoice, 381 credit note
inv_code = self._readxml_getvalue(
xmltree, xpath_exchg_doc + ['ram:TypeCode'], int)
if inv_code not in [380, 381]:
raise UserError(gettext(
'document_incoming_invoice_xml.msg_convert_error',
msg='invalid type-code: %(code)s (expect: 380, 381)' % {
'code': str(inv_code)}))
p_data['invoice_number'] = self._readxml_getvalue(
xmltree, xpath_exchg_doc + ['ram:ID'])
# invoice-date
date_path = xpath_exchg_doc + [
'ram:IssueDateTime', 'udt:DateTimeString']
date_format = self._readxml_getattrib(xmltree, date_path, 'format')
if date_format != '102':
raise UserError(gettext(
'document_incoming_invoice_xml.msg_convert_error',
msg='invalid date-format: %(code)s (expect: 102)' % {
'code': str(date_format)}))
p_data['invoice_date'] = self._readxml_convertdate(
self._readxml_getvalue(xmltree, date_path))
# IncludedNote, 1st line --> 'description', 2nd ff. --> 'comment'
xpath_notes = xpath_exchg_doc + ['ram:IncludedNote']
# count number of notes
num_nodes = len(xmltree.xpath(
self._readxml_xpath(xpath_notes), namespaces=xmltree.nsmap))
# read notes and their fields
note_list = []
for x in range(1, num_nodes + 1):
note_list.append({
y: self._readxml_getvalue(
xmltree, xpath_notes + [x, 'ram:%s' % y])
for y in ['Content', 'ContentCode', 'SubjectCode']
})
if note_list:
p_data['note_list'] = note_list
# rsm:CrossIndustryInvoice/rsm:SupplyChainTradeTransaction
xpath_suppl_chain = xpath_cross_ind + [
'rsm:SupplyChainTradeTransaction']
# rsm:CrossIndustryInvoice/
# rsm:SupplyChainTradeTransaction/
# ram:ApplicableHeaderTradeAgreement
xpath_appl_head_agree = xpath_suppl_chain + [
'ram:ApplicableHeaderTradeAgreement']
# supplier party
add_party = config and config.create_supplier
seller_party = self._readxml_party_data(
xmltree, xpath_appl_head_agree + ['ram:SellerTradeParty'],
create_party=add_party)
if seller_party:
p_data['seller_party'] = seller_party
# company party
buyer_party = self._readxml_party_data(
xmltree, xpath_appl_head_agree + ['ram:BuyerTradeParty'],
create_party=False)
if buyer_party:
p_data['buyer_party'] = buyer_party
# invoice - lines
# rsm:CrossIndustryInvoice/
# rsm:SupplyChainTradeTransaction/
# ram:IncludedSupplyChainTradeLineItem
xpath_line_item = xpath_suppl_chain + [
'ram:IncludedSupplyChainTradeLineItem']
# get number of invoice-lines
num_lines = len(xmltree.xpath(
self._readxml_xpath(xpath_line_item), namespaces=xmltree.nsmap))
lines_data = []
for x in range(1, num_lines + 1):
lines_data.append(
self._readxml_invoice_line(xmltree, xpath_line_item, x))
if lines_data:
p_data['lines_data'] = lines_data
# payment data
xpath_payment = xpath_suppl_chain + [
'ram:ApplicableHeaderTradeSettlement']
p_data['payment'] = {}
p_data['payment']['reference'] = self._readxml_getvalue(
xmltree, xpath_payment + ['ram:PaymentReference'])
p_data['payment']['currency'] = self._readxml_getvalue(
xmltree, xpath_payment + ['ram:InvoiceCurrencyCode'])
# num bank accounts
xpath_bankaccounts = xpath_payment + [
'ram:SpecifiedTradeSettlementPaymentMeans']
num_bank = len(xmltree.xpath(
self._readxml_xpath(xpath_bankaccounts), namespaces=xmltree.nsmap))
bank_accounts = []
for x in range(num_bank):
bank_account = {
'info': self._readxml_getvalue(
xmltree, xpath_bankaccounts + [x + 1, 'ram:Information']),
'type': self._readxml_getvalue(
xmltree, xpath_bankaccounts + [x + 1, 'ram:TypeCode']),
# debitor
'debitor_iban': self._readxml_getvalue(
xmltree, xpath_bankaccounts +
[x + 1, 'ram:PayerPartyDebtorFinancialAccount',
'ram:IBANID']),
# creditor
'creditor_iban': self._readxml_getvalue(
xmltree, xpath_bankaccounts +
[x + 1, 'ram:PayeePartyCreditorFinancialAccount',
'ram:IBANID']),
'creditor_name': self._readxml_getvalue(
xmltree, xpath_bankaccounts +
[x + 1, 'ram:PayeePartyCreditorFinancialAccount',
'ram:AccountName']),
# financial card
'card_id': self._readxml_getvalue(
xmltree, xpath_bankaccounts +
[x + 1, 'ram:ApplicableTradeSettlementFinancialCard',
'ram:ID']),
'card_holder_name': self._readxml_getvalue(
xmltree, xpath_bankaccounts +
[x + 1, 'ram:ApplicableTradeSettlementFinancialCard',
'ram:CardholderName']),
# bank name
'institution': self._readxml_getvalue(
xmltree, xpath_bankaccounts +
[x + 1, 'ram:PayeeSpecifiedCreditorFinancialInstitution',
'ram:BICID']),
}
# add to list of bank accounts, skip empty values
bank_accounts.append({
x: bank_account[x]
for x in bank_account.keys()
if bank_account[x]})
if bank_accounts:
p_data['payment']['bank'] = bank_accounts
# invoice-taxes
xpath_invoice_taxes = xpath_payment + ['ram:ApplicableTradeTax']
taxes = self._readxml_read_listdata(xmltree, xpath_invoice_taxes, [], [
('ram:CalculatedAmount', 'amount', Decimal),
('ram:TypeCode', 'type'),
('ram:ExemptionReason', 'reason'),
('ram:BasisAmount', 'base', Decimal),
('ram:LineTotalBasisAmount', 'basetotal', Decimal),
('ram:AllowanceChargeBasisAmount', 'allowancebase', Decimal),
('ram:CategoryCode', 'category_code'),
('ram:ExemptionReasonCode', 'reason_code'),
('ram:TaxPointDate', 'taxdate', date),
('ram:DueDateTypeCode', 'duecode'),
('ram:RateApplicablePercent', 'percent', Decimal)])
if taxes:
p_data['payment']['taxes'] = taxes
xpath_payterms = xpath_payment + ['ram:SpecifiedTradePaymentTerms']
xpath_discount = ['ram:ApplicableTradePaymentDiscountTerms']
pay_terms = self._readxml_read_listdata(xmltree, xpath_payterms, [], [
('ram:Description', 'description'),
(['ram:DueDateDateTime', 'udt:DateTimeString'],
'duedate', date),
('ram:DirectDebitMandateID', 'mandat_id'),
('ram:PartialPaymentAmount', 'amount', Decimal),
(xpath_discount + ['ram:BasisDateTime', 'udt:DateTimeString'],
'discount_date', date),
(xpath_discount + ['ram:BasisPeriodMeasure'],
'discount_measure', Decimal),
(xpath_discount + ['ram:BasisAmount'],
'discount_base', Decimal),
(xpath_discount + ['ram:CalculationPercent'],
'discount_perc', Decimal),
(xpath_discount + ['ram:ActualDiscountAmount'],
'discount_amount', Decimal)
])
if pay_terms:
p_data['payment']['terms'] = pay_terms
# total
xpath_total = xpath_payment + [
'ram:SpecifiedTradeSettlementHeaderMonetarySummation']
total = self._readxml_read_listdata(xmltree, xpath_total, [], [
('ram:LineTotalAmount', 'amount', Decimal),
('ram:ChargeTotalAmount', 'charge', Decimal),
('ram:AllowanceTotalAmount', 'allowance', Decimal),
('ram:TaxBasisTotalAmount', 'taxbase', Decimal),
('ram:TaxTotalAmount', 'taxtotal', Decimal),
('ram:RoundingAmount', 'rounding', Decimal),
('ram:GrandTotalAmount', 'grand', Decimal),
('ram:TotalPrepaidAmount', 'prepaid', Decimal),
('ram:DuePayableAmount', 'duepayable', Decimal),
])
if total:
p_data['total'] = total[0]
self.parsed_data = p_data
def _readxml_getinvoiceline(self, invoice, line_data):
""" create invoice line in memory
Args:
line_data (dict): values from xml to create
invoice line
"""
pool = Pool()
Line = pool.get('account.invoice.line')
Tax = pool.get('account.tax')
Uom = pool.get('product.uom')
Configuration = pool.get('document.incoming.configuration')
ModelData = pool.get('ir.model.data')
cfg1 = Configuration.get_singleton()
def get_tax_by_percent(percent):
""" search for tax by percent of supplier tax
Args:
percent (Decimal): tax percent, 7% --> 0.07
Raises:
UserError: if no product category is configured or
no matching tax was found
Returns:
tuple: (model 'account.tax, model 'product.category')
"""
if not (cfg1 and cfg1.product_category):
raise UserError(gettext(
'document_incoming_invoice_xml.msg_no_prodcat_configured'))
for pcat in cfg1.product_category:
for s_tax in pcat.supplier_taxes:
# get current taxes of supplier-tax at
# invoice-date
current_taxes = Tax.compute(
[s_tax], Decimal('1'), 1.0, invoice.invoice_date)
# skip result of multiple or none taxes
if len(current_taxes) != 1:
continue
if (current_taxes[0]['tax'].rate == percent) and (
current_taxes[0]['tax'].type == 'percentage'):
# found it
return (s_tax, pcat)
# no product category found
raise UserError(gettext(
'document_incoming_invoice_xml.msg_no_prodcat_found',
percent=percent * Decimal('100.0')))
# uom
xml_uom = Uom(ModelData.get_id('product', 'uom_unit'))
unit_code = line_data.get('quantity', {}).pop('unit_code', None)
if unit_code:
units = Uom.search([('unece_code', '=', unit_code)])
if units:
xml_uom = units[0]
line = Line(
invoice=invoice,
type='line',
unit=xml_uom,
quantity=line_data.get('quantity', {}).pop('billed', None),
unit_price=line_data.get('unit_net_price', {}).pop('amount', None))
line_no = line_data.pop('line_no', None)
# description
descr = [
'[%s]' % line_no if line_no else None,
line_data.pop('name', None),
line_data.pop('description', None)]
line.description = '; '.join([x for x in descr if x])
# get taxes and product-category from settings
tax_and_category = []
line_taxes = line_data.pop('taxes', [])
for x in line_taxes:
percent = x.get('percent', None)
if (x.get('type', '') == 'VAT') and (percent is not None):
percent = percent / Decimal('100')
tax_and_category.append(get_tax_by_percent(percent))
# check result
if len(line_taxes) != len(tax_and_category):
raise UserError(gettext(
'document_incoming_invoice_xml.msg_numtaxes_invalid',
descr=line.description,
taxin='|'.join([
str(x.get('percent', '-')) for x in line_taxes]),
taxout='|'.join([
str(x[0].rate * Decimal('100.0'))
for x in tax_and_category])))
# set taxes and account for product
line.taxes = [x[0] for x in tax_and_category]
expense_accounts = [
x[1].account_expense
for x in tax_and_category if x[1].account_expense]
if expense_accounts:
line.account = expense_accounts[0]
# check if calculated 'amount' matches with amount from xml-data
assert invoice.currency is not None
xml_amount = line_data.get('total', {}).pop('amount', None)
if xml_amount is not None:
if xml_amount != line.get_amount(None):
raise UserError(gettext(
'document_incoming_invoice_xml.msg_line_amount_invalid',
linetxt=line.description,
calcsum=Report.format_currency(
line.get_amount(None), None, invoice.currency),
xmlsum=Report.format_currency(
xml_amount, None, invoice.currency)))
# cleanup used values
for x in ['quantity', 'unit_net_price', 'total']:
if not line_data.get(x, {}):
del line_data[x]
# note of line
notes = [line_data.pop('line_note', None)]
# skipped xml-data
convert_notes = line_data.pop('convert_note', None)
if convert_notes:
notes.extend(
[' '] +
[gettext('document_incoming_invoice_xml.msg_convert_note')] +
convert_notes
if isinstance(convert_notes, list) else [str(convert_notes)])
# values not used to create invoice-line
if line_data:
notes.extend([
' ',
gettext(
'document_incoming_invoice_xml.msg_unused_linevalues'),
json.dumps(line_data, cls=JSONEncoder, indent=3)])
line.note = '\n'.join(x for x in notes if x)
line.on_change_invoice()
return line
def _readxml_read_listdata(self, xmldata, xpth, xtag, key_list):
""" read list of values from xml
Args:
xmldata (xtree): xml-tree
xpth (list): list of xml-tags until index-counter
pos (int): position in list
xtag (str): xml-tag to read as list
key_list (list): [('<xtag>', '<key>')]
Returns:
list: list of dict
"""
if isinstance(xtag, str):
xtag = [xtag]
xpath_list = xpth + xtag
num_listitem = len(xmldata.xpath(
self._readxml_xpath(xpath_list), namespaces=xmldata.nsmap))
listdata = []
for x in range(1, num_listitem + 1):
values = {}
for list_item in key_list:
(xkey, key, xtype) = (
list_item if len(list_item) == 3
else tuple(list(list_item) + [None]))
value = self._readxml_getvalue(
xmldata,
xpath_list + [x] +
(xkey if isinstance(xkey, list) else [xkey]),
vtype=xtype)
if value is not None:
values[key] = value
if values:
listdata.append(values)
return listdata
def _readxml_invoice_line(self, xmldata, xpth, pos):
""" read invoice-line from xml
Args:
xmldata (): xml-tree object
xpth (list): xpath to invoice-line
pos (int): number of line to read
Returns:
dict: invoice line data
"""
xpath_line = xpth + [pos]
result = {'convert_note': []}
result['line_no'] = self._readxml_getvalue(xmldata, xpath_line + [
'ram:AssociatedDocumentLineDocument', 'ram:LineID'])
result['line_note'] = '\n'.join(self._readxml_getvalue(
xmldata, xpath_line + [
'ram:AssociatedDocumentLineDocument', 'ram:IncludedNote',
'ram:Content'], allow_list=True))
for xkey, key in [
('ram:ID', 'prod_id'),
('ram:GlobalID', 'glob_id'),
('ram:SellerAssignedID', 'seller_id'),
('ram:BuyerAssignedID', 'buyer_id'),
('ram:IndustryAssignedID', 'industy_id'),
('ram:ModelID', 'model_id'),
('ram:Name', 'name'),
('ram:Description', 'description'),
('ram:BatchID', 'lot'),
('ram:BrandName', 'brand_name'),
('ram:ModelName', 'model_name'),
(['ram:OriginTradeCountry', 'ram:ID'], 'trade_country')
]:
result[key] = self._readxml_getvalue(xmldata, xpath_line + [
'ram:SpecifiedTradeProduct'] +
(xkey if isinstance(xkey, list) else [xkey]))
# attributes of product
result['attributes'] = self._readxml_read_listdata(
xmldata, xpath_line, [
'ram:SpecifiedTradeProduct',
'ram:ApplicableProductCharacteristic'], [
('ram:TypeCode', 'code'),
('ram:Description', 'description'),
('ram:ValueMeasure', 'value', Decimal),
('ram:Value', 'uom')])
# classification of product
result['classification'] = self._readxml_read_listdata(
xmldata, xpath_line, [
'ram:SpecifiedTradeProduct',
'ram:DesignatedProductClassification'], [
('ram:ClassCode', 'code'),
('ram:ClassName', 'name')])
# serial-numbers of product
result['serialno'] = self._readxml_read_listdata(
xmldata, xpath_line, [
'ram:SpecifiedTradeProduct',
'ram:IndividualTradeProductInstance'], [
('ram:BatchID', 'lot'),
('ram:SupplierAssignedSerialID', 'serial')])
# referenced product
result['refprod'] = self._readxml_read_listdata(
xmldata, xpath_line,
['ram:SpecifiedTradeProduct', 'ram:IncludedReferencedProduct'], [
('ram:ID', 'id'),
('ram:GlobalID', 'global_id'),
('ram:SellerAssignedID', 'seller_id'),
('ram:BuyerAssignedID', 'buyer_id'),
('ram:Name', 'name'),
('ram:Description', 'description'),
('ram:UnitQuantity', 'quantity', Decimal),
])
# net price
xpath_netprice = xpath_line + [
'ram:SpecifiedLineTradeAgreement', 'ram:NetPriceProductTradePrice']
if self._readxml_getvalue(xmldata, xpath_netprice):
result['unit_net_price'] = self._readxml_read_listdata(
xmldata, xpath_line, [
'ram:SpecifiedLineTradeAgreement',
'ram:NetPriceProductTradePrice'], [
('ram:ChargeAmount', 'amount', Decimal),
('ram:BasisQuantity', 'basequantity', Decimal)])[0]
# notice ignored field
if self._readxml_getvalue(xmldata, xpath_netprice + [
'ram:AppliedTradeAllowanceCharge']):
result['convert_note'].append(
'skip: ' + self._readxml_xpath(
xpath_netprice + ['ram:AppliedTradeAllowanceCharge']))
# gross price
xpath_grossprice = xpath_line + [
'ram:SpecifiedLineTradeAgreement',
'ram:GrossPriceProductTradePrice']
if self._readxml_getvalue(xmldata, xpath_grossprice):
result['unit_gross_price'] = self._readxml_read_listdata(
xmldata, xpath_line, [
'ram:SpecifiedLineTradeAgreement',
'ram:GrossPriceProductTradePrice'], [
('ram:ChargeAmount', 'amount', Decimal),
('ram:BasisQuantity', 'basequantity', Decimal)])[0]
# notice ignored field
if self._readxml_getvalue(xmldata, xpath_grossprice + [
'ram:AppliedTradeAllowanceCharge']):
result['convert_note'].append(
'skip: ' + self._readxml_xpath(
xpath_grossprice +
['ram:AppliedTradeAllowanceCharge']))
# quantity
xpath_quantity = xpath_line + ['ram:SpecifiedLineTradeDelivery']
result['quantity'] = self._readxml_read_listdata(
xmldata, xpath_line, [
'ram:SpecifiedLineTradeDelivery'], [
('ram:BilledQuantity', 'billed', Decimal),
('ram:ChargeFreeQuantity', 'chargefree', Decimal),
('ram:PackageQuantity', 'package', Decimal),
])[0]
result['quantity']['unit_code'] = self._readxml_getattrib(
xmldata, xpath_quantity + ['ram:BilledQuantity'], 'unitCode')
# notice ignored fields
for x in [
'ShipToTradeParty', 'UltimateShipToTradeParty',
'ActualDeliverySupplyChainEvent',
'DespatchAdviceReferencedDocument',
'ReceivingAdviceReferencedDocument',
'DeliveryNoteReferencedDocument']:
xp_to_check = xpath_quantity + ['ram:' + x]
if self._readxml_getvalue(xmldata, xp_to_check):
result['convert_note'].append(
'skip: ' + self._readxml_xpath(xp_to_check))
# taxes
xpath_trade = xpath_line + ['ram:SpecifiedLineTradeSettlement']
result['taxes'] = self._readxml_read_listdata(xmldata, xpath_line, [
'ram:SpecifiedLineTradeSettlement',
'ram:ApplicableTradeTax'], [
('ram:CalculatedAmount', 'amount', Decimal),
('ram:TypeCode', 'type'),
('ram:ExemptionReason', 'reason'),
('ram:BasisAmount', 'base', Decimal),
('ram:LineTotalBasisAmount', 'basetotal', Decimal),
('ram:AllowanceChargeBasisAmount', 'allowancebase', Decimal),
('ram:CategoryCode', 'category_code'),
('ram:ExemptionReasonCode', 'reason_code'),
('ram:TaxPointDate', 'taxdate', date),
('ram:DueDateTypeCode', 'duecode'),
('ram:RateApplicablePercent', 'percent', Decimal)])
# total amounts
result['total'] = self._readxml_read_listdata(xmldata, xpath_line, [
'ram:SpecifiedLineTradeSettlement',
'ram:SpecifiedTradeSettlementLineMonetarySummation'], [
('ram:LineTotalAmount', 'amount', Decimal),
('ram:ChargeTotalAmount', 'charge', Decimal),
('ram:AllowanceTotalAmount', 'fee', Decimal),
('ram:TaxTotalAmount', 'tax', Decimal),
('ram:GrandTotalAmount', 'grand', Decimal),
('ram:TotalAllowanceChargeAmount', 'feecharge', Decimal),
])[0]
# notice ignored fields
for x in [
'BillingSpecifiedPeriod', 'SpecifiedTradeAllowanceCharge',
'ActualDeliverySupplyChainEvent',
'InvoiceReferencedDocument',
'AdditionalReferencedDocument',
'ReceivableSpecifiedTradeAccountingAccount']:
xp_to_check = xpath_trade + ['ram:' + x]
if self._readxml_getvalue(xmldata, xp_to_check):
result['convert_note'].append(
'skip: ' + self._readxml_xpath(xp_to_check))
# skip None values
return {x: result[x] for x in result.keys() if result[x]}
def _readxml_convertdate(self, date_string):
""" convert date-string to python-date
Args:
date_string (_type_): _description_
"""
if not date_string:
return None
try:
return datetime.strptime(date_string, '%Y%m%d').date()
except ValueError:
raise UserError(gettext(
'document_incoming_invoice_xml.msg_convert_error',
msg='cannot convert %(val)s' % {
'val': date_string}))
def _facturx_detect_content(self):
""" check xml-data against xsd of XRechnung and FacturX,
begin with extended, goto minimal, then xrechnung
Returns:
tuple: ('xsd tyle', '<function to read xml>', <xml etree>)
defaults to None if not detected
"""
if self.mime_type == 'application/xml':
xml_data = etree.fromstring(self.data)
elif self.mime_type == 'application/pdf':
try:
(xml_filename, xml_bytes) = facturx.get_xml_from_pdf(
self.data, check_xsd=True)
xml_data = etree.fromstring(xml_bytes)
except Exception as e1:
raise UserError(gettext(
'document_incoming_invoice_xml.msg_convert_error',
msg=str(e1)))
else:
raise UserError(gettext(
'document_incoming_invoice_xml.msg_convert_error',
msg='content-type "%(mime)s" not supported' % {
'mime': self.mime_type}))
fx_flavour = facturx.get_flavor(xml_data)
fx_level = facturx.get_level(xml_data)
for xsdpath, xsdtype, funcname in xml_types:
if fx_flavour == 'factur-x' and fx_level:
if not (xsdtype.lower().startswith('factur-x') and
xsdtype.lower().endswith(fx_level)):
# skip check if xml-content is already known
continue
fname = os.path.join(*[os.path.split(__file__)[0]] + xsdpath)
schema = etree.XMLSchema(etree.parse(fname))
try:
schema.assertValid(xml_data)
self.xsd_type = (
xsdtype if self.mime_type == 'application/xml'
else 'PDF + ' + xsdtype)
return (xsdtype, funcname, xml_data)
except etree.DocumentInvalid as e1:
# fire exception for knows xml-content
if fx_flavour == 'factur-x' and fx_level:
raise UserError(gettext(
'document_incoming_invoice_xml.msg_convert_error',
msg='%(name)s [%(flavour)s|%(level)s] %(msg)s' % {
'name': self.name or '-',
'flavour': fx_flavour,
'level': fx_level,
'msg': str(e1)}))
pass
return None
# end Incoming