read parties of supplier+company from xml + tests

This commit is contained in:
Frederik Jaeckel 2025-01-07 15:32:51 +01:00
parent 8a48fb7236
commit 0a4e933188
6 changed files with 338 additions and 13 deletions

View file

@ -8,7 +8,7 @@
import os.path
from lxml import etree
from datetime import datetime
from trytond.pool import PoolMeta
from trytond.pool import PoolMeta, Pool
from trytond.transaction import Transaction
from trytond.exceptions import UserError
from trytond.i18n import gettext
@ -116,7 +116,9 @@ class Incoming(metaclass=PoolMeta):
if not allow_list:
break
if not allow_list:
return result[0]
if result:
return result[0]
return None
return result
def _readxml_getattrib(self, xmltree, tags, attrib, vtype=None):
@ -141,6 +143,150 @@ class Incoming(metaclass=PoolMeta):
result = vtype(result)
return result
def _readxml_find_party(self, party_data):
""" find party by search with data from xml-file
Args:
party_data (dict): data of party read from xml
Returns:
int: id of party or None
"""
pool = Pool()
Party = pool.get('party.party')
Address = pool.get('party.address')
if party_data['name']:
query = [(
'name', 'ilike',
'%%%(name)s%%' % {'name': party_data['name']})]
if len(set({'postal_code', 'street', 'city'}).intersection(
set(party_data.keys()))) == 3:
# ignore capitalization
address_sql = Address.search([
('city', 'ilike', party_data['city']),
('postal_code', '=', party_data['postal_code']),
('street', 'ilike',
party_data['street'].split('\n')[0] + '%')],
query=True)
query.append(('addresses', 'in', address_sql))
party = Party.search(query)
if party:
# we use this party if its a exact match
if len(party) == 1:
return party[0].id
return None
def _readxml_create_party(self, partydata):
""" create party with data from xml
Args:
partydata (dict): data of party, read from xml
Returns:
int: id of created party
"""
pool = Pool()
Party = pool.get('party.party')
Invoice = pool.get('account.invoice')
Company = pool.get('company.company')
if not partydata['name']:
return None
to_create = {'name': partydata['name']}
address = {
x: partydata[x]
for x in ['street', 'postal_code', 'city', 'country',
'subdivision']
if x in partydata.keys()}
# if no country, we use the company-country
if 'country' not in address.keys():
company = Invoice.default_company()
if company:
company = Company(company)
company_address = company.party.address_get()
if company_address and company_address.country:
address['country'] = company_address.country.id
if address:
to_create['addresses'] = [('create', [address])]
party, = Party.create([to_create])
return party.id
def _readxml_party_data(self, xmltree, tags, create_party=False):
""" read party data
Args:
xmltree (xmldata): XML-tree
tags (list): tags to build xpath
create_party (boolean, optional): create party if not found
Returns:
dict: data of party
"""
pool = Pool()
Country = pool.get('country.country')
SubDivision = pool.get('country.subdivision')
result = {}
# name
result['name'] = self._readxml_getvalue(
xmltree, tags + ['ram:Name'])
result['postal_code'] = self._readxml_getvalue(
xmltree, tags + ['ram:PostalTradeAddress', 'ram:PostcodeCode'])
if not result['postal_code']:
del result['postal_code']
# address, max. 3 lines
result['street'] = [self._readxml_getvalue(
xmltree, tags + ['ram:PostalTradeAddress', 'ram:LineOne'])]
result['street'].append(self._readxml_getvalue(
xmltree, tags + ['ram:PostalTradeAddress', 'ram:LineTwo']))
result['street'].append(self._readxml_getvalue(
xmltree, tags + ['ram:PostalTradeAddress', 'ram:LineThree']))
result['street'] = '\n'.join([x for x in result['street'] if x])
if not result['street']:
del result['street']
# city
result['city'] = self._readxml_getvalue(
xmltree, tags + ['ram:PostalTradeAddress', 'ram:CityName'])
if not result['city']:
del result['city']
# country
country_code = self._readxml_getvalue(
xmltree,
tags + ['ram:PostalTradeAddress', 'ram:CountryID'])
if country_code:
country = Country.search([('code', '=', country_code.upper())])
if country:
result['country'] = country[0].id
# subdivision
subdivision = self._readxml_getvalue(
xmltree,
tags + ['ram:PostalTradeAddress', 'ram:CountrySubDivisionName'])
if subdivision and ('country' in result.keys()):
subdiv = SubDivision.search([
('name', '=', subdivision),
('country', '=', result['country'])])
if subdiv:
result['subdivision'] = subdiv[0].id
party_id = self._readxml_find_party(result)
if party_id:
result['party'] = party_id
else:
if create_party:
party_id = self._readxml_create_party(result)
if party_id:
result['party'] = party_id
return result
def _readxml_facturx_extended(self, invoice, xmltree):
""" read factur-x extended
@ -150,6 +296,10 @@ class Incoming(metaclass=PoolMeta):
Returns:
record: model account.invoice
"""
Configuration = Pool().get('document.incoming.configuration')
config = Configuration.get_singleton()
# check invoice-type:
# allowed codes 380 incoice, 381 credit note
inv_code = self._readxml_getvalue(xmltree, [
@ -189,13 +339,46 @@ class Incoming(metaclass=PoolMeta):
if note_list:
invoice.description = note_list[0].get('Content', None)
invoice.comment = '\n'.join([
'%(code)s%(subj)s%(msg)s' % {
'code': ('Code=%s, ' % x.get('ContentCode', ''))
if x.get('ContentCode', '') else '',
'subj': ('Subject=%s, ' % x.get('SubjectCode', ''))
if x.get('SubjectCode', '') else '',
'msg': x.get('Content', ''),
} for x in note_list[1:]])
'%(code)s%(subj)s%(msg)s' % {
'code': ('Code=%s, ' % x.get('ContentCode', ''))
if x.get('ContentCode', '') else '',
'subj': ('Subject=%s, ' % x.get('SubjectCode', ''))
if x.get('SubjectCode', '') else '',
'msg': x.get('Content', ''),
} for x in note_list[1:]])
# supplier party
add_party = config and config.create_supplier
seller_party = self._readxml_party_data(xmltree, [
'rsm:CrossIndustryInvoice', 'rsm:SupplyChainTradeTransaction',
'ram:ApplicableHeaderTradeAgreement',
'ram:SellerTradeParty'], create_party=add_party)
if seller_party:
if 'party' in seller_party.keys():
invoice.party = seller_party['party']
invoice.on_change_party()
else:
raise UserError(gettext(
'document_incoming_invoice_xml.msg_no_supplierparty',
partytxt=', '.join([
seller_party[x].replace('\n', '; ')
for x in seller_party.keys()])))
# company party
buyer_party = self._readxml_party_data(xmltree, [
'rsm:CrossIndustryInvoice', 'rsm:SupplyChainTradeTransaction',
'ram:ApplicableHeaderTradeAgreement',
'ram:BuyerTradeParty'], create_party=False)
# check if we found our company
if config and not config.accept_other_company:
company_party_id = self._readxml_find_party(buyer_party)
if not (company_party_id and
(company_party_id == self.company.party.id)):
raise UserError(gettext(
'document_incoming_invoice_xml.msg_not_our_company',
partytxt=', '.join([
buyer_party[x].replace('\n', '; ')
for x in buyer_party.keys()])))
return invoice