# -*- coding: utf-8 -*- # This file is part of the document-incoming-invoice-xml-module # from m-ds for Tryton. The COPYRIGHT file at the top level of # this repository contains the full copyright notices and license terms. # https://portal3.gefeg.com/projectdata/invoice/deliverables/installed/publishingproject/xrechnung%20%28german%20cius%29/xrechnung_crossindustryinvoice%3B%202017-10-18.scm/html/021.htm?https://portal3.gefeg.com/projectdata/invoice/deliverables/installed/publishingproject/xrechnung%20%28german%20cius%29/xrechnung_crossindustryinvoice%3B%202017-10-18.scm/html/025.htm import os.path import json from lxml import etree from datetime import datetime, date from decimal import Decimal from trytond.pool import PoolMeta, Pool from trytond.report import Report from trytond.transaction import Transaction from trytond.exceptions import UserError from trytond.i18n import gettext from trytond.model import fields from trytond.pyson import Eval from trytond.protocols.jsonrpc import JSONEncoder xml_types = [ (['xsd', 'Factur-X_1.07.2_EXTENDED', 'Factur-X_1.07.2_EXTENDED.xsd'], 'Factur-X extended', 'facturx_extended'), (['xsd', 'Factur-X_1.07.2_EN16931', 'Factur-X_1.07.2_EN16931.xsd'], 'Factur-X EN16931', ''), (['xsd', 'Factur-X_1.07.2_BASIC', 'Factur-X_1.07.2_BASIC.xsd'], 'Factur-X basic', ''), (['xsd', 'Factur-X_1.07.2_BASICWL', 'Factur-X_1.07.2_BASICWL.xsd'], 'Factur-X basic-wl', ''), (['xsd', '/Factur-X_1.07.2_MINIMUM', 'Factur-X_1.07.2_MINIMUM.xsd'], 'Factur-X minimum', ''), (['xsd', 'CII D22B XSD', 'CrossIndustryInvoice_100pD22B.xsd'], 'CrossIndustryInvoice D22', ''), (['xsd', 'os-UBL-2.1', 'xsd/maindoc', 'UBL-Invoice-2.1.xsd'], 'XRechnung - Invoice', ''), (['xsd', 'os-UBL-2.1', 'xsd', 'maindoc', 'UBL-CreditNote-2.1.xsd'], 'XRechnung - Credit Note', ''), (['xsd', 'os-UBL-2.1', 'xsd', 'maindoc', 'UBL-DebitNote-2.1.xsd'], 'XRechnung - Debit Note', '')] class Incoming(metaclass=PoolMeta): __name__ = 'document.incoming' xsd_type = fields.Char( string='XML Content', readonly=True, states={'invisible': Eval('mime_type', '') != 'application/xml'}) @classmethod def default_company(cls): return Transaction().context.get('company') def _process_supplier_invoice(self): """ try to detect content of 'data', read values """ invoice = super()._process_supplier_invoice() self.xsd_type = None if self.mime_type == 'application/xml': # detect xml-content xml_info = self._facturx_detect_content() if xml_info: (xsd_type, funcname, xmltree) = xml_info self.xsd_type = xsd_type xml_read_func = getattr(self, '_readxml_%s' % funcname, None) if not xml_read_func: raise UserError(gettext( 'document_incoming_invoice_xml.msg_not_implemented', xmltype=xsd_type)) # read xml data, write to 'self.parsed_data' xml_read_func(xmltree) # update invoice with imported data invoice = self._readxml_update_invoice(invoice) return invoice def _readxml_xpath(self, tags): """ generate xpath Args: tags (list): list of string or integer to build path """ parts = [] for x in tags: if isinstance(x, str): parts.append(x) elif isinstance(x, int): if parts[-1].endswith(']'): raise ValueError('multiple list selector') parts[-1] += '[%d]' % x result = '/' + '/'.join(parts) return result def _readxml_getvalue( self, xmltree, tags, vtype=None, allow_list=False): """ read 'text'-part from xml-xpath, convert to 'vtype' Args: tags (list): list of tags to build xpath vtype (type-class, optional): to convert value of text-part Defaults to None. allow_list (boolean, optional): get result as list of values, Defaults to False. Returns: various: converted value or None """ result = [] xpath = self._readxml_xpath(tags) nodes = xmltree.xpath(xpath, namespaces=xmltree.nsmap) if nodes: for node1 in nodes: result.append( node1.text if vtype is None else self._readxml_convertdate(node1.text) if vtype == date else vtype(node1.text)) if not allow_list: break if not allow_list: if result: return result[0] return None return result def _readxml_getattrib(self, xmltree, tags, attrib, vtype=None): """ read attribute from xml-xpath, convert to 'vtype' Args: tags (list): list fo tags to build xpath attrib (str): name of attribute to read vtype (type-class, optional): to convert value. Defaults to None. Returns: various: converted value or None """ result = None xpath = self._readxml_xpath(tags) nodes = xmltree.xpath(xpath, namespaces=xmltree.nsmap) if nodes: result = nodes[0].attrib.get(attrib, None) if result and vtype: result = vtype(result) return result def _readxml_find_party(self, party_data): """ find party by search with data from xml-file Args: party_data (dict): data of party read from xml Returns: int: id of party or None """ pool = Pool() Party = pool.get('party.party') Address = pool.get('party.address') if party_data['name']: query = [( 'name', 'ilike', '%%%(name)s%%' % {'name': party_data['name']})] if len(set({'postal_code', 'street', 'city'}).intersection( set(party_data.keys()))) == 3: # ignore capitalization address_sql = Address.search([ ('city', 'ilike', party_data['city']), ('postal_code', '=', party_data['postal_code']), ('street', 'ilike', party_data['street'].split('\n')[0] + '%')], query=True) query.append(('addresses', 'in', address_sql)) party = Party.search(query) if party: # we use this party if its a exact match if len(party) == 1: return party[0].id return None def _readxml_create_party(self, partydata): """ create party with data from xml Args: partydata (dict): data of party, read from xml Returns: int: id of created party """ pool = Pool() Party = pool.get('party.party') Invoice = pool.get('account.invoice') Company = pool.get('company.company') if not partydata['name']: return None to_create = {'name': partydata['name']} address = { x: partydata[x] for x in ['street', 'postal_code', 'city', 'country', 'subdivision'] if x in partydata.keys()} # if no country, we use the company-country if 'country' not in address.keys(): company = Invoice.default_company() if company: company = Company(company) company_address = company.party.address_get() if company_address and company_address.country: address['country'] = company_address.country.id if address: to_create['addresses'] = [('create', [address])] party, = Party.create([to_create]) return party.id def _readxml_party_data(self, xmltree, tags, create_party=False): """ read party data Args: xmltree (xmldata): XML-tree tags (list): tags to build xpath create_party (boolean, optional): create party if not found Returns: dict: data of party """ pool = Pool() Country = pool.get('country.country') SubDivision = pool.get('country.subdivision') result = {} # name result['name'] = self._readxml_getvalue( xmltree, tags + ['ram:Name']) result['postal_code'] = self._readxml_getvalue( xmltree, tags + ['ram:PostalTradeAddress', 'ram:PostcodeCode']) if not result['postal_code']: del result['postal_code'] # address, max. 3 lines result['street'] = [self._readxml_getvalue( xmltree, tags + ['ram:PostalTradeAddress', 'ram:LineOne'])] result['street'].append(self._readxml_getvalue( xmltree, tags + ['ram:PostalTradeAddress', 'ram:LineTwo'])) result['street'].append(self._readxml_getvalue( xmltree, tags + ['ram:PostalTradeAddress', 'ram:LineThree'])) result['street'] = '\n'.join([x for x in result['street'] if x]) if not result['street']: del result['street'] # city result['city'] = self._readxml_getvalue( xmltree, tags + ['ram:PostalTradeAddress', 'ram:CityName']) if not result['city']: del result['city'] # country country_code = self._readxml_getvalue( xmltree, tags + ['ram:PostalTradeAddress', 'ram:CountryID']) if country_code: country = Country.search([('code', '=', country_code.upper())]) if country: result['country'] = country[0].id # subdivision subdivision = self._readxml_getvalue( xmltree, tags + ['ram:PostalTradeAddress', 'ram:CountrySubDivisionName']) if subdivision and ('country' in result.keys()): subdiv = SubDivision.search([ ('name', '=', subdivision), ('country', '=', result['country'])]) if subdiv: result['subdivision'] = subdiv[0].id party_id = self._readxml_find_party(result) if party_id: result['party'] = party_id else: if create_party: party_id = self._readxml_create_party(result) if party_id: result['party'] = party_id return result def _readxml_update_invoice(self, invoice): """ update invoice with parsed_data Args: invoice (record): model account.invoice Returns: record: model account.invoice """ Configuration = Pool().get('document.incoming.configuration') config = Configuration.get_singleton() if config and config.number_target == 'number': invoice.number = self.parsed_data.get('invoice_number', None) else: invoice.reference = self.parsed_data.get('invoice_number', None) invoice.invoice_date = self.parsed_data.get('invoice_date', None) note_list = self.parsed_data.get('note_list', None) if note_list: invoice.description = note_list[0].get('Content', None) invoice.comment = '\n'.join([ '%(code)s%(subj)s%(msg)s' % { 'code': ('Code=%s, ' % x.get('ContentCode', '')) if x.get('ContentCode', '') else '', 'subj': ('Subject=%s, ' % x.get('SubjectCode', '')) if x.get('SubjectCode', '') else '', 'msg': x.get('Content', ''), } for x in note_list[1:]]) seller_party = self.parsed_data.get('seller_party', None) if seller_party: if 'party' in seller_party.keys(): invoice.party = seller_party['party'] invoice.on_change_party() else: raise UserError(gettext( 'document_incoming_invoice_xml.msg_no_supplierparty', partytxt=', '.join([ seller_party[x].replace('\n', '; ') for x in seller_party.keys()]))) # check if we found our company buyer_party = self.parsed_data.get('buyer_party', None) if buyer_party and config and not config.accept_other_company: company_party_id = self._readxml_find_party(buyer_party) if not (company_party_id and (company_party_id == self.company.party.id)): raise UserError(gettext( 'document_incoming_invoice_xml.msg_not_our_company', partytxt=', '.join([ buyer_party[x].replace('\n', '; ') for x in buyer_party.keys()]))) lines_data = copy.deepcopy(self.parsed_data.get('lines_data', None)) if lines_data: lines = [ self._readxml_getinvoiceline(invoice, x) for x in lines_data] for x in range(len(lines)): lines[x].sequence = x + 1 invoice.lines = lines invoice.on_change_lines() print('\n## parsed-data:', self.parsed_data) return invoice def _readxml_facturx_extended(self, xmltree): """ read factur-x extended Args: invoice (record): model account.invoice Returns: record: model account.invoice """ Configuration = Pool().get('document.incoming.configuration') config = Configuration.get_singleton() if not hasattr(self, 'parsed_data'): self.parsed_data = {} if not isinstance(self.parsed_data, dict): self.parsed_data = {} # rsm:CrossIndustryInvoice xpath_cross_ind = ['rsm:CrossIndustryInvoice'] # rsm:CrossIndustryInvoice/rsm:ExchangedDocument xpath_exchg_doc = xpath_cross_ind + ['rsm:ExchangedDocument'] # check invoice-type: # allowed codes 380 incoice, 381 credit note inv_code = self._readxml_getvalue( xmltree, xpath_exchg_doc + ['ram:TypeCode'], int) if inv_code not in [380, 381]: raise UserError(gettext( 'document_incoming_invoice_xml.msg_convert_error', msg='invalid type-code: %(code)s (expect: 380, 381)' % { 'code': str(inv_code)})) self.parsed_data['invoice_number'] = self._readxml_getvalue( xmltree, xpath_exchg_doc + ['ram:ID']) # invoice-date date_path = xpath_exchg_doc + [ 'ram:IssueDateTime', 'udt:DateTimeString'] date_format = self._readxml_getattrib(xmltree, date_path, 'format') if date_format != '102': raise UserError(gettext( 'document_incoming_invoice_xml.msg_convert_error', msg='invalid date-format: %(code)s (expect: 102)' % { 'code': str(date_format)})) self.parsed_data['invoice_date'] = self._readxml_convertdate( self._readxml_getvalue(xmltree, date_path)) # IncludedNote, 1st line --> 'description', 2nd ff. --> 'comment' xpath_notes = xpath_exchg_doc + ['ram:IncludedNote'] # count number of notes num_nodes = len(xmltree.xpath( self._readxml_xpath(xpath_notes), namespaces=xmltree.nsmap)) # read notes and their fields note_list = [] for x in range(1, num_nodes + 1): note_list.append({ y: self._readxml_getvalue( xmltree, xpath_notes + [x, 'ram:%s' % y]) for y in ['Content', 'ContentCode', 'SubjectCode'] }) if note_list: self.parsed_data['note_list'] = note_list # rsm:CrossIndustryInvoice/rsm:SupplyChainTradeTransaction xpath_suppl_chain = xpath_cross_ind + [ 'rsm:SupplyChainTradeTransaction'] # rsm:CrossIndustryInvoice/ # rsm:SupplyChainTradeTransaction/ # ram:ApplicableHeaderTradeAgreement xpath_appl_head_agree = xpath_suppl_chain + [ 'ram:ApplicableHeaderTradeAgreement'] # supplier party add_party = config and config.create_supplier seller_party = self._readxml_party_data( xmltree, xpath_appl_head_agree + ['ram:SellerTradeParty'], create_party=add_party) if seller_party: self.parsed_data['seller_party'] = seller_party # company party buyer_party = self._readxml_party_data( xmltree, xpath_appl_head_agree + ['ram:BuyerTradeParty'], create_party=False) if buyer_party: self.parsed_data['buyer_party'] = buyer_party # invoice - lines # rsm:CrossIndustryInvoice/ # rsm:SupplyChainTradeTransaction/ # ram:IncludedSupplyChainTradeLineItem xpath_line_item = xpath_suppl_chain + [ 'ram:IncludedSupplyChainTradeLineItem'] # get number of invoice-lines num_lines = len(xmltree.xpath( self._readxml_xpath(xpath_line_item), namespaces=xmltree.nsmap)) lines_data = [] for x in range(1, num_lines + 1): lines_data.append( self._readxml_invoice_line(xmltree, xpath_line_item, x)) if lines_data: self.parsed_data['lines_data'] = lines_data # payment data xpath_payment = xpath_suppl_chain + [ 'ram:ApplicableHeaderTradeSettlement'] self.parsed_data['payment'] = {} self.parsed_data['payment']['reference'] = self._readxml_getvalue( xmltree, xpath_payment + ['ram:PaymentReference']) self.parsed_data['payment']['currency'] = self._readxml_getvalue( xmltree, xpath_payment + ['ram:InvoiceCurrencyCode']) # num bank accounts xpath_bankaccounts = xpath_payment + [ 'ram:SpecifiedTradeSettlementPaymentMeans'] num_bank = len(xmltree.xpath( self._readxml_xpath(xpath_bankaccounts), namespaces=xmltree.nsmap)) bank_accounts = [] for x in range(num_bank): bank_account = { 'info': self._readxml_getvalue( xmltree, xpath_bankaccounts + [x + 1, 'ram:Information']), 'type': self._readxml_getvalue( xmltree, xpath_bankaccounts + [x + 1, 'ram:TypeCode']), # debitor 'debitor_iban': self._readxml_getvalue( xmltree, xpath_bankaccounts + [x + 1, 'ram:PayerPartyDebtorFinancialAccount', 'ram:IBANID']), # creditor 'creditor_iban': self._readxml_getvalue( xmltree, xpath_bankaccounts + [x + 1, 'ram:PayeePartyCreditorFinancialAccount', 'ram:IBANID']), 'creditor_name': self._readxml_getvalue( xmltree, xpath_bankaccounts + [x + 1, 'ram:PayeePartyCreditorFinancialAccount', 'ram:AccountName']), # financial card 'card_id': self._readxml_getvalue( xmltree, xpath_bankaccounts + [x + 1, 'ram:ApplicableTradeSettlementFinancialCard', 'ram:ID']), 'card_holder_name': self._readxml_getvalue( xmltree, xpath_bankaccounts + [x + 1, 'ram:ApplicableTradeSettlementFinancialCard', 'ram:CardholderName']), # bank name 'institution': self._readxml_getvalue( xmltree, xpath_bankaccounts + [x + 1, 'ram:PayeeSpecifiedCreditorFinancialInstitution', 'ram:BICID']), } # add to list of bank accounts, skip empty values bank_accounts.append({ x: bank_account[x] for x in bank_account.keys() if bank_account[x]}) if bank_accounts: self.parsed_data['payment']['bank'] = bank_accounts def _readxml_getinvoiceline(self, invoice, line_data): """ create invoice line in memory Args: line_data (dict): values from xml to create invoice line """ pool = Pool() Line = pool.get('account.invoice.line') Tax = pool.get('account.tax') Uom = pool.get('product.uom') Configuration = pool.get('document.incoming.configuration') ModelData = pool.get('ir.model.data') cfg1 = Configuration.get_singleton() def get_tax_by_percent(percent): """ search for tax by percent of supplier tax Args: percent (Decimal): tax percent, 7% --> 0.07 Raises: UserError: if no product category is configured or no matching tax was found Returns: tuple: (model 'account.tax, model 'product.category') """ if not (cfg1 and cfg1.product_category): raise UserError(gettext( 'document_incoming_invoice_xml.msg_no_prodcat_configured')) for pcat in cfg1.product_category: for s_tax in pcat.supplier_taxes: # get current taxes of supplier-tax at # invoice-date current_taxes = Tax.compute( [s_tax], Decimal('1'), 1.0, invoice.invoice_date) # skip result of multiple or none taxes if len(current_taxes) != 1: continue if (current_taxes[0]['tax'].rate == percent) and ( current_taxes[0]['tax'].type == 'percentage'): # found it return (s_tax, pcat) # no product category found raise UserError(gettext( 'document_incoming_invoice_xml.msg_no_prodcat_found', percent=percent * Decimal('100.0'))) # uom xml_uom = Uom(ModelData.get_id('product', 'uom_unit')) for x_attr in line_data.get('attributes', []): x_uom = x_attr.get('uom', None) if not x_uom: continue units = Uom.search([('symbol', '=', x_uom)]) if units: xml_uom = units[0] line = Line( invoice=invoice, type='line', unit=xml_uom, quantity=line_data.get('quantity', {}).pop('billed', None), unit_price=line_data.get('unit_net_price', {}).pop('amount', None)) line_no = line_data.pop('line_no', None) # description descr = [ '[%s]' % line_no if line_no else None, line_data.pop('name', None), line_data.pop('description', None)] line.description = '; '.join([x for x in descr if x]) # get taxes and product-category from settings tax_and_category = [] line_taxes = line_data.pop('taxes', []) for x in line_taxes: percent = x.get('percent', None) if (x.get('type', '') == 'VAT') and (percent is not None): percent = percent / Decimal('100') tax_and_category.append(get_tax_by_percent(percent)) # check result if len(line_taxes) != len(tax_and_category): raise UserError(gettext( 'document_incoming_invoice_xml.msg_numtaxes_invalid', descr=line.description, taxin='|'.join([ str(x.get('percent', '-')) for x in line_taxes]), taxout='|'.join([ str(x[0].rate * Decimal('100.0')) for x in tax_and_category]))) # set taxes and account for product line.taxes = [x[0] for x in tax_and_category] expense_accounts = [ x[1].account_expense for x in tax_and_category if x[1].account_expense] if expense_accounts: line.account = expense_accounts[0] # check if calculated 'amount' matches with amount from xml-data xml_amount = line_data.get('total', {}).pop('amount', None) if xml_amount is not None: if xml_amount != line.get_amount(None): raise UserError(gettext( 'document_incoming_invoice_xml.msg_line_amount_invalid', linetxt=line.description, calcsum=Report.format_currency( line.get_amount(None), None, invoice.currency), xmlsum=Report.format_currency( xml_amount, None, invoice.currency))) # cleanup used values for x in ['quantity', 'unit_net_price', 'total']: if not line_data.get(x, {}): del line_data[x] # note of line notes = [line_data.pop('line_note', None)] # skipped xml-data convert_notes = line_data.pop('convert_note', None) if convert_notes: notes.extend( [' '] + [gettext('document_incoming_invoice_xml.msg_convert_note')] + convert_notes if isinstance(convert_notes, list) else [str(convert_notes)]) # values not used to create invoice-line if line_data: notes.extend([ ' ', gettext( 'document_incoming_invoice_xml.msg_unused_linevalues'), json.dumps(line_data, cls=JSONEncoder, indent=3)]) line.note = '\n'.join(x for x in notes if x) line.on_change_invoice() return line def _readxml_invoice_line(self, xmldata, xpth, pos): """ read invoice-line from xml Args: xmldata (): xml-tree object xpth (list): xpath to invoice-line pos (int): number of line to read Returns: dict: invoice line data """ def read_listdata(xtag, key_list): """ read list of values from xml Args: xtag (str): xml-tag to read as list key_list (list): [('', '')] Returns: list: list of dict """ if isinstance(xtag, str): xtag = [xtag] xpath_list = xpth + [pos] + xtag num_listitem = len(xmldata.xpath( self._readxml_xpath(xpath_list), namespaces=xmldata.nsmap)) listdata = [] for x in range(1, num_listitem + 1): values = {} for list_item in key_list: (xkey, key, xtype) = ( list_item if len(list_item) == 3 else tuple(list(list_item) + [None])) value = self._readxml_getvalue( xmldata, xpath_list + [x, xkey], vtype=xtype) if value is not None: values[key] = value if values: listdata.append(values) return listdata result = {'convert_note': []} result['line_no'] = self._readxml_getvalue(xmldata, xpth + [pos] + [ 'ram:AssociatedDocumentLineDocument', 'ram:LineID']) result['line_note'] = '\n'.join(self._readxml_getvalue( xmldata, xpth + [pos] + [ 'ram:AssociatedDocumentLineDocument', 'ram:IncludedNote', 'ram:Content'], allow_list=True)) for xkey, key in [ ('ram:ID', 'prod_id'), ('ram:GlobalID', 'glob_id'), ('ram:SellerAssignedID', 'seller_id'), ('ram:BuyerAssignedID', 'buyer_id'), ('ram:IndustryAssignedID', 'industy_id'), ('ram:ModelID', 'model_id'), ('ram:Name', 'name'), ('ram:Description', 'description'), ('ram:BatchID', 'lot'), ('ram:BrandName', 'brand_name'), ('ram:ModelName', 'model_name'), ('ram:OriginTradeCountry', 'trade_country') ]: result[key] = self._readxml_getvalue(xmldata, xpth + [pos] + [ 'ram:SpecifiedTradeProduct', xkey]) # attributes of product result['attributes'] = read_listdata([ 'ram:SpecifiedTradeProduct', 'ram:ApplicableProductCharacteristic'], [ ('ram:TypeCode', 'code'), ('ram:Description', 'description'), ('ram:ValueMeasure', 'uom'), ('ram:ValueY', 'value')]) # classification of product result['classification'] = read_listdata([ 'ram:SpecifiedTradeProduct', 'ram:DesignatedProductClassification'], [ ('ram:ClassCode', 'code'), ('ram:ClassName', 'name')]) # serial-numbers of product result['serialno'] = read_listdata([ 'ram:SpecifiedTradeProduct', 'ram:IndividualTradeProductInstance'], [ ('ram:BatchID', 'lot'), ('ram:SupplierAssignedSerialID', 'serial')]) # referenced product result['refprod'] = read_listdata( ['ram:SpecifiedTradeProduct', 'ram:IncludedReferencedProduct'], [ ('ram:ID', 'id'), ('ram:GlobalID', 'global_id'), ('ram:SellerAssignedID', 'seller_id'), ('ram:BuyerAssignedID', 'buyer_id'), ('ram:Name', 'name'), ('ram:Description', 'description'), ('ram:UnitQuantity', 'quantity', Decimal), ]) # net price xpath_netprice = xpth + [pos] + [ 'ram:SpecifiedLineTradeAgreement', 'ram:NetPriceProductTradePrice'] if self._readxml_getvalue(xmldata, xpath_netprice): result['unit_net_price'] = read_listdata([ 'ram:SpecifiedLineTradeAgreement', 'ram:NetPriceProductTradePrice'], [ ('ram:ChargeAmount', 'amount', Decimal), ('ram:BasisQuantity', 'basequantity', Decimal)])[0] # notice ignored field if self._readxml_getvalue(xmldata, xpath_netprice + [ 'ram:AppliedTradeAllowanceCharge']): result['convert_note'].append( 'skip: ' + self._readxml_xpath( xpath_netprice + ['ram:AppliedTradeAllowanceCharge'])) # gross price xpath_grossprice = xpth + [pos] + [ 'ram:SpecifiedLineTradeAgreement', 'ram:GrossPriceProductTradePrice'] if self._readxml_getvalue(xmldata, xpath_grossprice): result['unit_gross_price'] = read_listdata([ 'ram:SpecifiedLineTradeAgreement', 'ram:GrossPriceProductTradePrice'], [ ('ram:ChargeAmount', 'amount', Decimal), ('ram:BasisQuantity', 'basequantity', Decimal)])[0] # notice ignored field if self._readxml_getvalue(xmldata, xpath_grossprice + [ 'ram:AppliedTradeAllowanceCharge']): result['convert_note'].append( 'skip: ' + self._readxml_xpath( xpath_grossprice + ['ram:AppliedTradeAllowanceCharge'])) # quantity xpath_quantity = xpth + [pos] + ['ram:SpecifiedLineTradeDelivery'] result['quantity'] = read_listdata( ['ram:SpecifiedLineTradeDelivery'], [ ('ram:BilledQuantity', 'billed', Decimal), ('ram:ChargeFreeQuantity', 'chargefree', Decimal), ('ram:PackageQuantity', 'package', Decimal), ])[0] # notice ignored fields for x in [ 'ShipToTradeParty', 'UltimateShipToTradeParty', 'ActualDeliverySupplyChainEvent', 'DespatchAdviceReferencedDocument', 'ReceivingAdviceReferencedDocument', 'DeliveryNoteReferencedDocument']: xp_to_check = xpath_quantity + ['ram:' + x] if self._readxml_getvalue(xmldata, xp_to_check): result['convert_note'].append( 'skip: ' + self._readxml_xpath(xp_to_check)) # taxes xpath_trade = xpth + [pos] + ['ram:SpecifiedLineTradeSettlement'] result['taxes'] = read_listdata([ 'ram:SpecifiedLineTradeSettlement', 'ram:ApplicableTradeTax'], [ ('ram:CalculatedAmount', 'amount', Decimal), ('ram:TypeCode', 'type'), ('ram:ExemptionReason', 'reason'), ('ram:BasisAmount', 'base', Decimal), ('ram:LineTotalBasisAmount', 'basetotal', Decimal), ('ram:AllowanceChargeBasisAmount', 'feebase', Decimal), ('ram:CategoryCode', 'category_code'), ('ram:ExemptionReasonCode', 'reason_code'), ('ram:TaxPointDate', 'taxdate', date), ('ram:DueDateTypeCode', 'duecode'), ('ram:RateApplicablePercent', 'percent', Decimal)]) # total amounts result['total'] = read_listdata([ 'ram:SpecifiedLineTradeSettlement', 'ram:SpecifiedTradeSettlementLineMonetarySummation'], [ ('ram:LineTotalAmount', 'amount', Decimal), ('ram:ChargeTotalAmount', 'charge', Decimal), ('ram:AllowanceTotalAmount', 'fee', Decimal), ('ram:TaxTotalAmount', 'tax', Decimal), ('ram:GrandTotalAmount', 'grand', Decimal), ('ram:TotalAllowanceChargeAmount', 'feecharge', Decimal), ])[0] # notice ignored fields for x in [ 'BillingSpecifiedPeriod', 'SpecifiedTradeAllowanceCharge', 'ActualDeliverySupplyChainEvent', 'InvoiceReferencedDocument', 'AdditionalReferencedDocument', 'ReceivableSpecifiedTradeAccountingAccount']: xp_to_check = xpath_trade + ['ram:' + x] if self._readxml_getvalue(xmldata, xp_to_check): result['convert_note'].append( 'skip: ' + self._readxml_xpath(xp_to_check)) # skip None values return {x: result[x] for x in result.keys() if result[x]} def _readxml_convertdate(self, date_string): """ convert date-string to python-date Args: date_string (_type_): _description_ """ if not date_string: return None try: return datetime.strptime(date_string, '%Y%m%d').date() except ValueError: raise UserError(gettext( 'document_incoming_invoice_xml.msg_convert_error', msg='cannot convert %(val)s' % { 'val': date_string})) def _facturx_detect_content(self): """ check xml-data against xsd of XRechnung and FacturX, begin with extended, goto minimal, then xrechnung Returns: tuple: ('xsd tyle', '', ) defaults to None if not detected """ xml_data = etree.fromstring(self.data) for xsdpath, xsdtype, funcname in xml_types: fname = os.path.join(*[os.path.split(__file__)[0]] + xsdpath) schema = etree.XMLSchema(etree.parse(fname)) try: schema.assertValid(xml_data) except etree.DocumentInvalid: pass return (xsdtype, funcname, xml_data) return None # end Incoming