Files
crpt-aggregation/xml_generator_2_0.py
2026-05-08 14:59:56 +03:00

633 lines
26 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
XML Generator 2.0 CLI Tool
Processes XLSX or CSV files to generate XML with pack_content sections.
Includes validation against set dictionary rules.
"""
import click
import csv
import xml.etree.ElementTree as ET
from collections import defaultdict, Counter
import re
import uuid
from datetime import datetime
from typing import Dict, List, Optional, Tuple, Any
try:
import openpyxl
except ImportError:
pass # Will be handled with an explicit error inside DataReader if needed
def _safe_str(val: Any) -> str:
"""Safely convert Excel cell values to string, preserving formatting as best as possible."""
if val is None:
return ""
if isinstance(val, float):
# Prevent outputting '1.0' for integer counts or GTINs that were read as floats
if val.is_integer():
return str(int(val))
return str(val).strip()
class DataReader:
"""Handles XLSX and CSV file reading and data processing."""
@staticmethod
def _read_csv(file_path: str) -> Tuple[List[str], List[Dict[str, Any]]]:
"""Read CSV file and return headers and rows."""
with open(file_path, 'r', encoding='utf-8-sig') as file:
reader = csv.DictReader(file, delimiter=';')
headers = [field.lstrip('\ufeff') for field in reader.fieldnames] if reader.fieldnames else []
rows = []
for row in reader:
clean_row = {k.lstrip('\ufeff') if isinstance(k, str) else k: v for k, v in row.items()}
rows.append(clean_row)
return headers, rows
@staticmethod
def _read_xlsx(file_path: str) -> Tuple[List[str], List[Dict[str, Any]]]:
"""Read XLSX file and return headers and rows."""
try:
import openpyxl
except ImportError:
raise ImportError("Please install openpyxl to read .xlsx files (pip install openpyxl)")
wb = openpyxl.load_workbook(file_path, data_only=True)
sheet = wb.active
# Get headers
headers = [_safe_str(cell.value) for cell in sheet[1]]
rows = []
for row in sheet.iter_rows(min_row=2, values_only=True):
if any(cell is not None for cell in row):
row_dict = dict(zip(headers, [_safe_str(cell) for cell in row]))
rows.append(row_dict)
return headers, rows
@staticmethod
def _read_file(file_path: str) -> Tuple[List[str], List[Dict[str, Any]]]:
"""Route to appropriate reader based on file extension."""
if str(file_path).lower().endswith('.xlsx'):
return DataReader._read_xlsx(file_path)
else:
return DataReader._read_csv(file_path)
@staticmethod
def read_data_with_gtins(file_path: str, cis_column: str = "Код") -> Dict[str, List[Dict[str, str]]]:
"""Read data file and group data by SET CIS with GTIN information."""
pack_data = defaultdict(list)
headers, rows = DataReader._read_file(file_path)
for row in rows:
# Поддержка новых и старых названий колонок
set_cis = row.get('SET_CODE', row.get('SET CIS', ''))
cis_code = row.get('ITEM_CODE', row.get(cis_column, ''))
set_gtin = row.get('SET_GTIN', row.get('SET GTIN', ''))
gtin = row.get('ITEM_GTIN', row.get('GTIN', ''))
if set_cis and cis_code:
pack_data[set_cis].append({
'cis_code': cis_code,
'set_gtin': set_gtin,
'gtin': gtin
})
return pack_data
@staticmethod
def read_data_simple(file_path: str, cis_column: str = "CIS") -> Dict[str, List[str]]:
"""Read data file and group CIS codes by SET CIS."""
pack_data = defaultdict(list)
headers, rows = DataReader._read_file(file_path)
for row in rows:
# Поддержка новых и старых названий колонок
set_cis = row.get('SET_CODE', row.get('SET CIS', ''))
cis_code = row.get('ITEM_CODE', row.get(cis_column, ''))
if set_cis and cis_code:
pack_data[set_cis].append(cis_code)
return pack_data
class SetDictionary:
"""Handles set dictionary loading and management."""
def __init__(self, dict_file_path: str):
self.rules = self._load_set_dict(dict_file_path)
def _load_set_dict(self, dict_file_path: str) -> Dict[str, List[Dict[str, Any]]]:
"""Load set dictionary rules from XLSX or CSV file."""
set_rules = defaultdict(list)
headers, rows = DataReader._read_file(dict_file_path)
for row in rows:
# Support both new XLSX format and old CSV format
gtin_set = row.get('GTIN_FULL', row.get('GTIN SET', ''))
gtin_item = row.get('GTIN_ITEM_FULL', row.get('GTIN ITEM', ''))
count_val = row.get('Количество штук в упаковке', row.get('COUNT', ''))
set_name = row.get('Полное наименование товара', row.get('SET NAME', ''))
if gtin_set and gtin_item and count_val != '':
try:
count_num = float(count_val)
set_rules[gtin_set].append({
'gtin_item': gtin_item,
'count': count_num,
'set_name': set_name
})
except ValueError:
continue
return set_rules
def get_rules(self) -> Dict[str, List[Dict[str, Any]]]:
"""Get all loaded rules."""
return self.rules
def get_rule_count(self) -> int:
"""Get the number of loaded rules."""
return len(self.rules)
class PackValidator:
"""Handles pack composition validation."""
def __init__(self, set_dictionary: SetDictionary):
self.set_dict = set_dictionary
def validate_composition(self, pack_data: Dict[str, List[Dict[str, str]]]) -> List[Dict[str, str]]:
"""Validate pack composition against set dictionary rules."""
validation_results = []
for set_cis, items in pack_data.items():
if not items:
continue
result = self._validate_single_pack(set_cis, items)
validation_results.append(result)
return validation_results
def _validate_single_pack(self, set_cis: str, items: List[Dict[str, str]]) -> Dict[str, str]:
"""Validate a single pack composition."""
# Get SET GTIN from first item (should be same for all items in pack)
set_gtin = items[0]['set_gtin']
# Get expected composition from set rules
expected_items = self.set_dict.get_rules().get(set_gtin, [])
if not expected_items:
return {
'set_cis': set_cis,
'set_gtin': set_gtin,
'status': 'WARNING',
'message': f'No rules found for SET GTIN: {set_gtin}'
}
# Count actual GTINs
actual_gtins = [item['gtin'] for item in items if item['gtin']]
actual_counts = Counter(actual_gtins)
# Build expected counts
expected_counts = {}
set_name = ''
for item in expected_items:
expected_counts[item['gtin_item']] = int(item['count'])
if not set_name:
set_name = item['set_name']
# Validate composition
errors = self._check_composition_errors(actual_counts, expected_counts)
warnings = self._check_composition_warnings(actual_counts, expected_counts)
# Compile results
if errors:
return {
'set_cis': set_cis,
'set_gtin': set_gtin,
'set_name': set_name,
'status': 'ERROR',
'message': '; '.join(errors)
}
elif warnings:
return {
'set_cis': set_cis,
'set_gtin': set_gtin,
'set_name': set_name,
'status': 'WARNING',
'message': '; '.join(warnings)
}
else:
return {
'set_cis': set_cis,
'set_gtin': set_gtin,
'set_name': set_name,
'status': 'OK',
'message': 'Composition is valid'
}
def _check_composition_errors(self, actual_counts: Counter, expected_counts: Dict[str, int]) -> List[str]:
"""Check for missing items or wrong counts."""
errors = []
for expected_gtin, expected_count in expected_counts.items():
actual_count = actual_counts.get(expected_gtin, 0)
if actual_count == 0:
errors.append(f'Missing GTIN {expected_gtin} (expected {expected_count})')
elif actual_count != expected_count:
errors.append(f'Wrong count for GTIN {expected_gtin}: got {actual_count}, expected {expected_count}')
return errors
def _check_composition_warnings(self, actual_counts: Counter, expected_counts: Dict[str, int]) -> List[str]:
"""Check for unexpected items."""
warnings = []
for actual_gtin, actual_count in actual_counts.items():
if actual_gtin not in expected_counts:
warnings.append(f'Unexpected GTIN {actual_gtin} (count: {actual_count})')
return warnings
class XMLGenerator:
"""Handles XML generation and template processing."""
@staticmethod
def escape_xml_content(text: str) -> str:
"""Properly escape XML content for CDATA sections."""
# Note: XML CDATA does not strictly require escaping,
# but kept here for potential expansion.
return text
@staticmethod
def generate_pack_content_xml(pack_data: Dict[str, List[str]]) -> List[str]:
"""Generate pack_content XML elements from grouped data."""
pack_contents = []
for set_cis, cis_codes in pack_data.items():
pack_content_lines = []
pack_content_lines.append(f' <pack_content>')
pack_content_lines.append(f' <pack_code><![CDATA[{set_cis}]]></pack_code>')
for cis_code in cis_codes:
escaped_cis = XMLGenerator.escape_xml_content(cis_code)
pack_content_lines.append(f' <cis><![CDATA[{escaped_cis}]]></cis>')
pack_content_lines.append(f' </pack_content>')
pack_contents.append('\n'.join(pack_content_lines))
return pack_contents
@staticmethod
def process_xml_template(template_path: str, pack_contents: List[str],
document_id: Optional[str] = None,
document_number: Optional[str] = None,
operation_time: Optional[str] = None) -> str:
"""Process XML template and insert pack_content sections with parameter substitution."""
with open(template_path, 'r', encoding='utf-8') as file:
template_content = file.read()
# Replace template parameters if provided
template_content = XMLGenerator._replace_template_parameters(
template_content, document_id, document_number, operation_time
)
# Insert pack content
return XMLGenerator._insert_pack_content(template_content, pack_contents)
@staticmethod
def _replace_template_parameters(template_content: str,
document_id: Optional[str],
document_number: Optional[str],
operation_time: Optional[str]) -> str:
"""Replace template parameters in XML content."""
if document_id:
template_content = re.sub(r'document_id="[^"]*"', f'document_id="{document_id}"', template_content)
if document_number:
template_content = re.sub(r'document_number="[^"]*"', f'document_number="{document_number}"', template_content)
if operation_time:
template_content = re.sub(r'operation_date_time="[^"]*"', f'operation_date_time="{operation_time}"', template_content)
return template_content
@staticmethod
def _insert_pack_content(template_content: str, pack_contents: List[str]) -> str:
"""Insert pack_content sections into template."""
lines = template_content.split('\n')
result_lines = []
pack_content_inserted = False
inside_pack_content = False
for line in lines:
if '<pack_content>' in line and not pack_content_inserted:
inside_pack_content = True
continue
elif '</pack_content>' in line and not pack_content_inserted:
inside_pack_content = False
continue
elif inside_pack_content and not pack_content_inserted:
continue
elif '</Document>' in line and not pack_content_inserted:
for pack_content in pack_contents:
result_lines.append(pack_content)
pack_content_inserted = True
result_lines.append(line)
else:
result_lines.append(line)
return '\n'.join(result_lines)
class ParameterGenerator:
"""Handles parameter generation and validation."""
@staticmethod
def generate_document_id() -> str:
"""Generate a unique document ID."""
return f"unit_pack_{str(uuid.uuid4()).replace('-', '').upper()}"
@staticmethod
def generate_operation_time() -> str:
"""Generate current operation time in ISO format."""
return datetime.now().isoformat()
@staticmethod
def validate_operation_time(operation_time: str) -> bool:
"""Validate operation time format."""
try:
datetime.fromisoformat(operation_time.replace('Z', '+00:00'))
return True
except ValueError:
return False
class ValidationReporter:
"""Handles validation result reporting."""
@staticmethod
def report_validation_summary(validation_results: List[Dict[str, str]]) -> Tuple[int, int, int]:
"""Report validation summary and return counts."""
errors = sum(1 for r in validation_results if r['status'] == 'ERROR')
warnings = sum(1 for r in validation_results if r['status'] == 'WARNING')
ok = sum(1 for r in validation_results if r['status'] == 'OK')
click.echo(f"Validation results: {ok} OK, {warnings} warnings, {errors} errors")
return ok, warnings, errors
@staticmethod
def report_validation_details(validation_results: List[Dict[str, str]], show_ok: bool = False):
"""Report detailed validation results."""
for result in validation_results:
if result['status'] == 'ERROR':
click.echo(f"❌ ERROR: {result['set_cis']} - {result['message']}", err=True)
elif result['status'] == 'WARNING':
click.echo(f"⚠️ WARNING: {result['set_cis']} - {result['message']}")
elif show_ok:
click.echo(f"✅ OK: {result['set_cis']} - {result['message']}")
class DryRunReporter:
"""Handles dry run reporting."""
@staticmethod
def report_pack_data_preview(pack_data: Dict[str, List], max_items: int = 3):
"""Report pack data preview for dry run."""
click.echo("\nDry run - would process:")
for set_cis, items in list(pack_data.items())[:max_items]:
click.echo(f" SET CIS: {set_cis}")
click.echo(f" CIS codes: {len(items)} items")
for item in items[:2]: # Show first 2 CIS codes
if isinstance(item, dict):
click.echo(f" - {item['cis_code']}")
else:
click.echo(f" - {item}")
if len(items) > 2:
click.echo(f" ... and {len(items) - 2} more")
if len(pack_data) > max_items:
click.echo(f" ... and {len(pack_data) - max_items} more SET CIS codes")
@staticmethod
def report_parameters(document_id: str, document_number: Optional[str], operation_time: str):
"""Report parameters that would be used."""
click.echo(f"\nWould use parameters:")
click.echo(f" Document ID: {document_id}")
click.echo(f" Document Number: {document_number or 'Not specified'}")
click.echo(f" Operation Time: {operation_time}")
class XMLGeneratorApp:
"""Main application class for XML generation."""
def __init__(self, input_file: str, template_file: str, cis_column: str = "Код",
set_dict_file: Optional[str] = None):
self.input_file = input_file
self.template_file = template_file
self.cis_column = cis_column
self.set_dict_file = set_dict_file
# Initialize components
self.data_reader = DataReader()
self.xml_generator = XMLGenerator()
self.param_generator = ParameterGenerator()
self.validation_reporter = ValidationReporter()
self.dry_run_reporter = DryRunReporter()
# Initialize optional components
self.set_dictionary = None
self.validator = None
if set_dict_file:
self.set_dictionary = SetDictionary(set_dict_file)
self.validator = PackValidator(self.set_dictionary)
def load_data(self) -> Dict[str, List]:
"""Load Data from XLSX/CSV with or without GTIN information."""
click.echo(f"Reading file: {self.input_file}")
click.echo(f"Using CIS column: {self.cis_column}")
if self.set_dict_file:
return self.data_reader.read_data_with_gtins(self.input_file, self.cis_column)
else:
pack_data_simple = self.data_reader.read_data_simple(self.input_file, self.cis_column)
# Convert to format expected by validation logic
pack_data = {}
for set_cis, cis_codes in pack_data_simple.items():
pack_data[set_cis] = [{'cis_code': cis} for cis in cis_codes]
return pack_data
def load_validation_rules(self) -> Optional[SetDictionary]:
"""Load validation rules if dictionary file is provided."""
if not self.set_dict_file:
return None
click.echo(f"Loading set dictionary: {self.set_dict_file}")
click.echo(f"Loaded {self.set_dictionary.get_rule_count()} set rules")
return self.set_dictionary
def validate_data(self, pack_data: Dict[str, List[Dict[str, str]]]) -> Tuple[List[Dict[str, str]], bool]:
"""Validate pack composition and return results with error flag."""
if not self.validator:
return [], False
click.echo("Validating pack composition...")
validation_results = self.validator.validate_composition(pack_data)
# Report validation summary
ok, warnings, errors = self.validation_reporter.report_validation_summary(validation_results)
return validation_results, errors > 0
def generate_parameters(self, document_id: Optional[str], document_number: Optional[str],
operation_time: Optional[str]) -> Tuple[str, Optional[str], str]:
"""Generate or validate parameters."""
if not document_id:
document_id = self.param_generator.generate_document_id()
if not operation_time:
operation_time = self.param_generator.generate_operation_time()
elif not self.param_generator.validate_operation_time(operation_time):
click.echo(f"Warning: Invalid operation time format: {operation_time}", err=True)
return document_id, document_number, operation_time
def process_dry_run(self, pack_data: Dict[str, List], validation_results: List[Dict[str, str]],
document_id: str, document_number: Optional[str], operation_time: str):
"""Process dry run mode."""
# Show validation details if available
if validation_results:
self.validation_reporter.report_validation_details(validation_results, show_ok=True)
# Show data preview
self.dry_run_reporter.report_pack_data_preview(pack_data)
# Show parameters
self.dry_run_reporter.report_parameters(document_id, document_number, operation_time)
def generate_xml_output(self, pack_data: Dict[str, List], document_id: str,
document_number: Optional[str], operation_time: str) -> str:
"""Generate XML output from pack data."""
# Convert pack_data to simple format for XML generation if needed
if self.set_dict_file:
simple_pack_data = {}
for set_cis, items in pack_data.items():
simple_pack_data[set_cis] = [item['cis_code'] for item in items]
pack_data = simple_pack_data
# Generate pack_content XML sections
click.echo("Generating pack_content sections...")
pack_contents = self.xml_generator.generate_pack_content_xml(pack_data)
# Process template
click.echo(f"Processing template: {self.template_file}")
return self.xml_generator.process_xml_template(
self.template_file, pack_contents, document_id, document_number, operation_time
)
def save_or_print_output(self, xml_content: str, output_file: Optional[str]):
"""Save XML to file or print to stdout."""
# Если выходной файл не указан, генерируем его имя на основе входного
if not output_file:
import os
base_name, _ = os.path.splitext(self.input_file)
output_file = f"{base_name}_output.xml"
with open(output_file, 'w', encoding='utf-8') as f:
f.write(xml_content)
click.echo(f"XML generated successfully: {output_file}")
@click.command()
@click.argument('input_file', type=click.Path(exists=True, dir_okay=False))
@click.argument('template_file', type=click.Path(exists=True, dir_okay=False))
@click.option('--output', '-o', type=click.Path(dir_okay=False),
help='Output XML file path. If not specified, generates [input_filename]_output.xml in the same directory.')
@click.option('--cis-column', '-c', default='CIS',
help='Legacy column name for CIS codes in data file if ITEM_CODE is not found (default: "CIS")')
@click.option('--dry-run', is_flag=True,
help='Show what would be processed without generating output')
@click.option('--set-dict', type=click.Path(exists=True, dir_okay=False),
help='Path to set dictionary (XLSX/CSV) file for validation')
@click.option('--document-id', type=str,
help='Document ID to use in XML (auto-generated if not provided)')
@click.option('--document-number', type=str,
help='Document number to use in XML')
@click.option('--operation-time', type=str,
help='Operation time in ISO format (auto-generated if not provided)')
@click.option('--validate-only', is_flag=True,
help='Only validate composition without generating XML')
def generate_xml(input_file: str, template_file: str, output: Optional[str], cis_column: str,
dry_run: bool, set_dict: Optional[str], document_id: Optional[str],
document_number: Optional[str], operation_time: Optional[str], validate_only: bool):
"""
Generate XML file from XLSX/CSV data and template with validation.
INPUT_FILE: Path to XLSX or CSV file containing SET CIS and CIS codes
TEMPLATE_FILE: Path to XML template file
The INPUT_FILE should contain columns:
- 'SET_CODE' (or 'SET CIS'): Pack codes that will become <pack_code> elements
- 'ITEM_CODE' (or column specified by --cis-column): Individual CIS codes that will become <cis> elements
- 'SET_GTIN' (or 'SET GTIN'): SET GTIN codes for validation
- 'ITEM_GTIN' (or 'GTIN'): Individual GTIN codes for validation
"""
try:
# Initialize application
app = XMLGeneratorApp(input_file, template_file, cis_column, set_dict)
# Load validation rules if provided
app.load_validation_rules()
# Load data
pack_data = app.load_data()
if not pack_data:
click.echo("No data found in file or no matching columns.", err=True)
return
click.echo(f"Found {len(pack_data)} unique SET CIS codes")
# Validate composition if dictionary is provided
validation_results, has_errors = app.validate_data(pack_data)
# Show detailed validation results
if validation_results:
app.validation_reporter.report_validation_details(validation_results, show_ok=dry_run)
if has_errors and not dry_run and not validate_only:
click.echo("Errors found. Use --dry-run to see all issues or fix them before generating XML.", err=True)
return
if validate_only:
click.echo("Validation complete. Use without --validate-only to generate XML.")
return
# Generate parameters
document_id, document_number, operation_time = app.generate_parameters(
document_id, document_number, operation_time
)
if dry_run:
app.process_dry_run(pack_data, validation_results, document_id, document_number, operation_time)
return
# Generate XML output
xml_content = app.generate_xml_output(pack_data, document_id, document_number, operation_time)
# Save or print output
app.save_or_print_output(xml_content, output)
except Exception as e:
click.echo(f"Error: {str(e)}", err=True)
raise click.Abort()
if __name__ == '__main__':
generate_xml()