Files
crpt-aggregation/xml_generator.py
2026-05-08 14:59:56 +03:00

641 lines
26 KiB
Python

#!/usr/bin/env python3
"""
XML Generator CLI Tool
Processes semicolon-separated CSV files to generate XML with pack_content sections.
Includes validation against set dictionary rules.
"""
import click
import csv
import xml.etree.ElementTree as ET
from collections import defaultdict, Counter
from pathlib import Path
import html
import re
import uuid
from datetime import datetime
from typing import Dict, List, Optional, Tuple, Any
class CSVReader:
"""Handles CSV file reading and data processing."""
@staticmethod
def clean_csv_columns(fieldnames: List[str]) -> List[str]:
"""Clean CSV column names by removing BOM characters."""
return [field.lstrip('\ufeff') for field in fieldnames]
@staticmethod
def read_csv_with_gtins(file_path: str, cis_column: str = "Код") -> Dict[str, List[Dict[str, str]]]:
"""Read CSV file and group data by SET CIS with GTIN information."""
pack_data = defaultdict(list)
with open(file_path, 'r', encoding='utf-8-sig') as file:
reader = csv.DictReader(file, delimiter=';')
if reader.fieldnames:
reader.fieldnames = CSVReader.clean_csv_columns(reader.fieldnames)
for row in reader:
set_cis = row.get('SET CIS', '').strip()
cis_code = row.get(cis_column, '').strip()
set_gtin = row.get('SET GTIN', '').strip()
gtin = row.get('GTIN', '').strip()
if set_cis and cis_code:
pack_data[set_cis].append({
'cis_code': cis_code,
'set_gtin': set_gtin,
'gtin': gtin
})
return pack_data
@staticmethod
def read_csv_simple(file_path: str, cis_column: str = "CIS") -> Dict[str, List[str]]:
"""Read CSV file and group CIS codes by SET CIS."""
pack_data = defaultdict(list)
with open(file_path, 'r', encoding='utf-8-sig') as file:
reader = csv.DictReader(file, delimiter=';')
if reader.fieldnames:
reader.fieldnames = CSVReader.clean_csv_columns(reader.fieldnames)
for row in reader:
set_cis = row.get('SET CIS', '').strip()
cis_code = row.get(cis_column, '').strip()
if set_cis and cis_code:
pack_data[set_cis].append(cis_code)
return pack_data
class SetDictionary:
"""Handles set dictionary loading and management."""
def __init__(self, dict_file_path: str):
self.rules = self._load_set_dict(dict_file_path)
def _load_set_dict(self, dict_file_path: str) -> Dict[str, List[Dict[str, Any]]]:
"""Load set dictionary rules from CSV file."""
set_rules = defaultdict(list)
with open(dict_file_path, 'r', encoding='utf-8-sig') as file:
reader = csv.DictReader(file, delimiter=';')
if reader.fieldnames:
reader.fieldnames = CSVReader.clean_csv_columns(reader.fieldnames)
for row in reader:
gtin_set = row.get('GTIN SET', '').strip()
gtin_item = row.get('GTIN ITEM', '').strip()
count = row.get('COUNT', '').strip()
if gtin_set and gtin_item and count:
try:
count_num = float(count)
set_rules[gtin_set].append({
'gtin_item': gtin_item,
'count': count_num,
'set_name': row.get('SET NAME', '').strip()
})
except ValueError:
continue
return set_rules
def get_rules(self) -> Dict[str, List[Dict[str, Any]]]:
"""Get all loaded rules."""
return self.rules
def get_rule_count(self) -> int:
"""Get the number of loaded rules."""
return len(self.rules)
class PackValidator:
"""Handles pack composition validation."""
def __init__(self, set_dictionary: SetDictionary):
self.set_dict = set_dictionary
def validate_composition(self, pack_data: Dict[str, List[Dict[str, str]]]) -> List[Dict[str, str]]:
"""Validate pack composition against set dictionary rules."""
validation_results = []
for set_cis, items in pack_data.items():
if not items:
continue
result = self._validate_single_pack(set_cis, items)
validation_results.append(result)
return validation_results
def _validate_single_pack(self, set_cis: str, items: List[Dict[str, str]]) -> Dict[str, str]:
"""Validate a single pack composition."""
# Get SET GTIN from first item (should be same for all items in pack)
set_gtin = items[0]['set_gtin']
# Get expected composition from set rules
expected_items = self.set_dict.get_rules().get(set_gtin, [])
if not expected_items:
return {
'set_cis': set_cis,
'set_gtin': set_gtin,
'status': 'WARNING',
'message': f'No rules found for SET GTIN: {set_gtin}'
}
# Count actual GTINs
actual_gtins = [item['gtin'] for item in items if item['gtin']]
actual_counts = Counter(actual_gtins)
# Build expected counts
expected_counts = {}
set_name = ''
for item in expected_items:
expected_counts[item['gtin_item']] = int(item['count'])
if not set_name:
set_name = item['set_name']
# Validate composition
errors = self._check_composition_errors(actual_counts, expected_counts)
warnings = self._check_composition_warnings(actual_counts, expected_counts)
# Compile results
if errors:
return {
'set_cis': set_cis,
'set_gtin': set_gtin,
'set_name': set_name,
'status': 'ERROR',
'message': '; '.join(errors)
}
elif warnings:
return {
'set_cis': set_cis,
'set_gtin': set_gtin,
'set_name': set_name,
'status': 'WARNING',
'message': '; '.join(warnings)
}
else:
return {
'set_cis': set_cis,
'set_gtin': set_gtin,
'set_name': set_name,
'status': 'OK',
'message': 'Composition is valid'
}
def _check_composition_errors(self, actual_counts: Counter, expected_counts: Dict[str, int]) -> List[str]:
"""Check for missing items or wrong counts."""
errors = []
for expected_gtin, expected_count in expected_counts.items():
actual_count = actual_counts.get(expected_gtin, 0)
if actual_count == 0:
errors.append(f'Missing GTIN {expected_gtin} (expected {expected_count})')
elif actual_count != expected_count:
errors.append(f'Wrong count for GTIN {expected_gtin}: got {actual_count}, expected {expected_count}')
return errors
def _check_composition_warnings(self, actual_counts: Counter, expected_counts: Dict[str, int]) -> List[str]:
"""Check for unexpected items."""
warnings = []
for actual_gtin, actual_count in actual_counts.items():
if actual_gtin not in expected_counts:
warnings.append(f'Unexpected GTIN {actual_gtin} (count: {actual_count})')
return warnings
class XMLGenerator:
"""Handles XML generation and template processing."""
@staticmethod
def escape_xml_content(text: str) -> str:
"""Properly escape XML content for CDATA sections."""
"""Looks, like no any escaping needed in CDATA section according to
text = text.replace('&', '&')
text = text.replace('<', '&lt;')
text = text.replace('>', '&gt;')"""
return text
@staticmethod
def generate_pack_content_xml(pack_data: Dict[str, List[str]]) -> List[str]:
"""Generate pack_content XML elements from grouped data."""
pack_contents = []
for set_cis, cis_codes in pack_data.items():
pack_content_lines = []
pack_content_lines.append(f' <pack_content>')
pack_content_lines.append(f' <pack_code><![CDATA[{set_cis}]]></pack_code>')
for cis_code in cis_codes:
escaped_cis = XMLGenerator.escape_xml_content(cis_code)
pack_content_lines.append(f' <cis><![CDATA[{escaped_cis}]]></cis>')
pack_content_lines.append(f' </pack_content>')
pack_contents.append('\n'.join(pack_content_lines))
return pack_contents
@staticmethod
def process_xml_template(template_path: str, pack_contents: List[str],
document_id: Optional[str] = None,
document_number: Optional[str] = None,
operation_time: Optional[str] = None) -> str:
"""Process XML template and insert pack_content sections with parameter substitution."""
with open(template_path, 'r', encoding='utf-8') as file:
template_content = file.read()
# Replace template parameters if provided
template_content = XMLGenerator._replace_template_parameters(
template_content, document_id, document_number, operation_time
)
# Insert pack content
return XMLGenerator._insert_pack_content(template_content, pack_contents)
@staticmethod
def _replace_template_parameters(template_content: str,
document_id: Optional[str],
document_number: Optional[str],
operation_time: Optional[str]) -> str:
"""Replace template parameters in XML content."""
if document_id:
template_content = re.sub(r'document_id="[^"]*"', f'document_id="{document_id}"', template_content)
if document_number:
template_content = re.sub(r'document_number="[^"]*"', f'document_number="{document_number}"', template_content)
if operation_time:
template_content = re.sub(r'operation_date_time="[^"]*"', f'operation_date_time="{operation_time}"', template_content)
return template_content
@staticmethod
def _insert_pack_content(template_content: str, pack_contents: List[str]) -> str:
"""Insert pack_content sections into template."""
lines = template_content.split('\n')
result_lines = []
pack_content_inserted = False
inside_pack_content = False
for line in lines:
if '<pack_content>' in line and not pack_content_inserted:
inside_pack_content = True
continue
elif '</pack_content>' in line and not pack_content_inserted:
inside_pack_content = False
continue
elif inside_pack_content and not pack_content_inserted:
continue
elif '</Document>' in line and not pack_content_inserted:
for pack_content in pack_contents:
result_lines.append(pack_content)
pack_content_inserted = True
result_lines.append(line)
else:
result_lines.append(line)
return '\n'.join(result_lines)
class ParameterGenerator:
"""Handles parameter generation and validation."""
@staticmethod
def generate_document_id() -> str:
"""Generate a unique document ID."""
return f"unit_pack_{str(uuid.uuid4()).replace('-', '').upper()}"
@staticmethod
def generate_operation_time() -> str:
"""Generate current operation time in ISO format."""
return datetime.now().isoformat()
@staticmethod
def validate_operation_time(operation_time: str) -> bool:
"""Validate operation time format."""
try:
datetime.fromisoformat(operation_time.replace('Z', '+00:00'))
return True
except ValueError:
return False
class ValidationReporter:
"""Handles validation result reporting."""
@staticmethod
def report_validation_summary(validation_results: List[Dict[str, str]]) -> Tuple[int, int, int]:
"""Report validation summary and return counts."""
errors = sum(1 for r in validation_results if r['status'] == 'ERROR')
warnings = sum(1 for r in validation_results if r['status'] == 'WARNING')
ok = sum(1 for r in validation_results if r['status'] == 'OK')
click.echo(f"Validation results: {ok} OK, {warnings} warnings, {errors} errors")
return ok, warnings, errors
@staticmethod
def report_validation_details(validation_results: List[Dict[str, str]], show_ok: bool = False):
"""Report detailed validation results."""
for result in validation_results:
if result['status'] == 'ERROR':
click.echo(f"❌ ERROR: {result['set_cis']} - {result['message']}", err=True)
elif result['status'] == 'WARNING':
click.echo(f"⚠️ WARNING: {result['set_cis']} - {result['message']}")
elif show_ok:
click.echo(f"✅ OK: {result['set_cis']} - {result['message']}")
class DryRunReporter:
"""Handles dry run reporting."""
@staticmethod
def report_pack_data_preview(pack_data: Dict[str, List], max_items: int = 3):
"""Report pack data preview for dry run."""
click.echo("\nDry run - would process:")
for set_cis, items in list(pack_data.items())[:max_items]:
click.echo(f" SET CIS: {set_cis}")
click.echo(f" CIS codes: {len(items)} items")
for item in items[:2]: # Show first 2 CIS codes
if isinstance(item, dict):
click.echo(f" - {item['cis_code']}")
else:
click.echo(f" - {item}")
if len(items) > 2:
click.echo(f" ... and {len(items) - 2} more")
if len(pack_data) > max_items:
click.echo(f" ... and {len(pack_data) - max_items} more SET CIS codes")
@staticmethod
def report_parameters(document_id: str, document_number: Optional[str], operation_time: str):
"""Report parameters that would be used."""
click.echo(f"\nWould use parameters:")
click.echo(f" Document ID: {document_id}")
click.echo(f" Document Number: {document_number or 'Not specified'}")
click.echo(f" Operation Time: {operation_time}")
# Keep the old function names for backward compatibility
def read_csv_file(file_path: str, cis_column: str = "Код") -> Dict[str, List[str]]:
"""Legacy function for backward compatibility."""
return CSVReader.read_csv_simple(file_path, cis_column)
def read_csv_file_with_gtins(file_path: str, cis_column: str = "Код") -> Dict[str, List[Dict[str, str]]]:
"""Legacy function for backward compatibility."""
return CSVReader.read_csv_with_gtins(file_path, cis_column)
def load_set_dict(dict_file_path: str) -> Dict[str, List[Dict[str, Any]]]:
"""Legacy function for backward compatibility."""
return SetDictionary(dict_file_path).get_rules()
def validate_pack_composition(pack_data: Dict[str, List[Dict[str, str]]],
set_rules: Dict[str, List[Dict[str, Any]]]) -> List[Dict[str, str]]:
"""Legacy function for backward compatibility."""
# Create a temporary SetDictionary with the rules
temp_dict = SetDictionary.__new__(SetDictionary)
temp_dict.rules = set_rules
validator = PackValidator(temp_dict)
return validator.validate_composition(pack_data)
def escape_xml_content(text: str) -> str:
"""Legacy function for backward compatibility."""
return XMLGenerator.escape_xml_content(text)
def generate_pack_content_xml(pack_data: Dict[str, List[str]]) -> List[str]:
"""Legacy function for backward compatibility."""
return XMLGenerator.generate_pack_content_xml(pack_data)
def process_xml_template(template_path: str, pack_contents: List[str],
document_id: Optional[str] = None,
document_number: Optional[str] = None,
operation_time: Optional[str] = None) -> str:
"""Legacy function for backward compatibility."""
return XMLGenerator.process_xml_template(template_path, pack_contents, document_id, document_number, operation_time)
class XMLGeneratorApp:
"""Main application class for XML generation."""
def __init__(self, csv_file: str, template_file: str, cis_column: str = "Код",
set_dict_file: Optional[str] = None):
self.csv_file = csv_file
self.template_file = template_file
self.cis_column = cis_column
self.set_dict_file = set_dict_file
# Initialize components
self.csv_reader = CSVReader()
self.xml_generator = XMLGenerator()
self.param_generator = ParameterGenerator()
self.validation_reporter = ValidationReporter()
self.dry_run_reporter = DryRunReporter()
# Initialize optional components
self.set_dictionary = None
self.validator = None
if set_dict_file:
self.set_dictionary = SetDictionary(set_dict_file)
self.validator = PackValidator(self.set_dictionary)
def load_data(self) -> Dict[str, List]:
"""Load CSV data with or without GTIN information."""
click.echo(f"Reading CSV file: {self.csv_file}")
click.echo(f"Using CIS column: {self.cis_column}")
if self.set_dict_file:
return self.csv_reader.read_csv_with_gtins(self.csv_file, self.cis_column)
else:
pack_data_simple = self.csv_reader.read_csv_simple(self.csv_file, self.cis_column)
# Convert to format expected by validation
pack_data = {}
for set_cis, cis_codes in pack_data_simple.items():
pack_data[set_cis] = [{'cis_code': cis} for cis in cis_codes]
return pack_data
def load_validation_rules(self) -> Optional[SetDictionary]:
"""Load validation rules if dictionary file is provided."""
if not self.set_dict_file:
return None
click.echo(f"Loading set dictionary: {self.set_dict_file}")
click.echo(f"Loaded {self.set_dictionary.get_rule_count()} set rules")
return self.set_dictionary
def validate_data(self, pack_data: Dict[str, List[Dict[str, str]]]) -> Tuple[List[Dict[str, str]], bool]:
"""Validate pack composition and return results with error flag."""
if not self.validator:
return [], False
click.echo("Validating pack composition...")
validation_results = self.validator.validate_composition(pack_data)
# Report validation summary
ok, warnings, errors = self.validation_reporter.report_validation_summary(validation_results)
return validation_results, errors > 0
def generate_parameters(self, document_id: Optional[str], document_number: Optional[str],
operation_time: Optional[str]) -> Tuple[str, Optional[str], str]:
"""Generate or validate parameters."""
if not document_id:
document_id = self.param_generator.generate_document_id()
if not operation_time:
operation_time = self.param_generator.generate_operation_time()
elif not self.param_generator.validate_operation_time(operation_time):
click.echo(f"Warning: Invalid operation time format: {operation_time}", err=True)
return document_id, document_number, operation_time
def process_dry_run(self, pack_data: Dict[str, List], validation_results: List[Dict[str, str]],
document_id: str, document_number: Optional[str], operation_time: str):
"""Process dry run mode."""
# Show validation details if available
if validation_results:
self.validation_reporter.report_validation_details(validation_results, show_ok=True)
# Show data preview
self.dry_run_reporter.report_pack_data_preview(pack_data)
# Show parameters
self.dry_run_reporter.report_parameters(document_id, document_number, operation_time)
def generate_xml_output(self, pack_data: Dict[str, List], document_id: str,
document_number: Optional[str], operation_time: str) -> str:
"""Generate XML output from pack data."""
# Convert pack_data to simple format for XML generation if needed
if self.set_dict_file:
simple_pack_data = {}
for set_cis, items in pack_data.items():
simple_pack_data[set_cis] = [item['cis_code'] for item in items]
pack_data = simple_pack_data
# Generate pack_content XML sections
click.echo("Generating pack_content sections...")
pack_contents = self.xml_generator.generate_pack_content_xml(pack_data)
# Process template
click.echo(f"Processing template: {self.template_file}")
return self.xml_generator.process_xml_template(
self.template_file, pack_contents, document_id, document_number, operation_time
)
def save_or_print_output(self, xml_content: str, output_file: Optional[str]):
"""Save XML to file or print to stdout."""
if output_file:
with open(output_file, 'w', encoding='utf-8') as f:
f.write(xml_content)
click.echo(f"XML generated successfully: {output_file}")
else:
click.echo(xml_content)
@click.command()
@click.argument('csv_file', type=click.Path(exists=True, dir_okay=False))
@click.argument('template_file', type=click.Path(exists=True, dir_okay=False))
@click.option('--output', '-o', type=click.Path(dir_okay=False),
help='Output XML file path. If not specified, prints to stdout.')
@click.option('--cis-column', '-c', default='CIS',
help='Column name for CIS codes in CSV file (default: "CIS")')
@click.option('--encoding', '-e', default='utf-8',
help='CSV file encoding (default: utf-8)')
@click.option('--dry-run', is_flag=True,
help='Show what would be processed without generating output')
@click.option('--set-dict', type=click.Path(exists=True, dir_okay=False),
help='Path to set dictionary CSV file for validation')
@click.option('--document-id', type=str,
help='Document ID to use in XML (auto-generated if not provided)')
@click.option('--document-number', type=str,
help='Document number to use in XML')
@click.option('--operation-time', type=str,
help='Operation time in ISO format (auto-generated if not provided)')
@click.option('--validate-only', is_flag=True,
help='Only validate composition without generating XML')
def generate_xml(csv_file: str, template_file: str, output: Optional[str], cis_column: str,
encoding: str, dry_run: bool, set_dict: Optional[str], document_id: Optional[str],
document_number: Optional[str], operation_time: Optional[str], validate_only: bool):
"""
Generate XML file from CSV data and template with validation.
CSV_FILE: Path to semicolon-separated CSV file containing SET CIS and CIS codes
TEMPLATE_FILE: Path to XML template file
The CSV file should contain columns:
- 'SET CIS': Pack codes that will become <pack_code> elements
- Column specified by --cis-column: Individual CIS codes that will become <cis> elements
- 'SET GTIN': SET GTIN codes for validation
- 'GTIN': Individual GTIN codes for validation
"""
try:
# Initialize application
app = XMLGeneratorApp(csv_file, template_file, cis_column, set_dict)
# Load validation rules if provided
app.load_validation_rules()
# Load CSV data
pack_data = app.load_data()
if not pack_data:
click.echo("No data found in CSV file or no matching columns.", err=True)
return
click.echo(f"Found {len(pack_data)} unique SET CIS codes")
# Validate composition if dictionary is provided
validation_results, has_errors = app.validate_data(pack_data)
# Show detailed validation results
if validation_results:
app.validation_reporter.report_validation_details(validation_results, show_ok=dry_run)
if has_errors and not dry_run and not validate_only:
click.echo("Errors found. Use --dry-run to see all issues or fix them before generating XML.", err=True)
return
if validate_only:
click.echo("Validation complete. Use without --validate-only to generate XML.")
return
# Generate parameters
document_id, document_number, operation_time = app.generate_parameters(
document_id, document_number, operation_time
)
if dry_run:
app.process_dry_run(pack_data, validation_results, document_id, document_number, operation_time)
return
# Generate XML output
xml_content = app.generate_xml_output(pack_data, document_id, document_number, operation_time)
# Save or print output
app.save_or_print_output(xml_content, output)
except Exception as e:
click.echo(f"Error: {str(e)}", err=True)
raise click.Abort()
if __name__ == '__main__':
generate_xml()