example1 (W2 data extraction)

"""
example 1: W2 form data extraction
"""
import base64
from pathlib import Path
from azure.ai.documentintelligence.models import AnalyzeDocumentRequest
from utility import client, is_file_or_url, load_file_as_base64

document_dir = Path('./documents')
w2_dir = document_dir / 'w2'
file_path = w2_dir / 'W2_Clean_DataSet_01' / 'W2_XL_input_clean_1000.jpg'

if not file_path.exists():
    raise FileNotFoundError(f'File {file_path} not found')

model_id = "prebuilt-tax.us.w2"

document_ai_client = client()

# doc_source = '<doc url>'
doc_source = file_path

if is_file_or_url(str(doc_source)) == 'url':
    print('Doc is a url')
    poller = document_ai_client.begin_analyze_document(
        model_id, AnalyzeDocumentRequest(url_source=doc_source)
    )
elif is_file_or_url(str(doc_source)) == 'file':
    print('Doc is a file')
    poller = document_ai_client.begin_analyze_document(
        model_id, {"base64Source": load_file_as_base64(doc_source)}
    )

result = poller.result()

# dict_keys(['apiVersion', 'modelId', 'stringIndexType', 'content', 'pages', 'styles', 'documents', 'contentFormat'])
print(result.keys())
print('Document Page Total:', len(result['pages']))
print(result['modelId'])
print(result['pages'][0].keys())
print(result['pages'][0]['pageNumber']) # document page number
print(result['pages'][0]['words']) # words in the document page
# A line is an ordered sequence of consecutive content elements separated by a visual space
for line_indx, line in enumerate(result['pages'][0]['lines']):
    print(f'Line {line_indx+1}:', line['content'])
print(result['content'])
print(result['contentFormat'])

print('Document count :', len(result.documents))

for document in result.documents:
    # >>> document.keys()
    # dict_keys(['docType', 'boundingRegions', 'fields', 'confidence', 'spans'])
    print('Doc type:', document['docType'])
    print('Bounding Area:', document['boundingRegions'])
    print('Confidence:', document['confidence'] * 100.0, '%')
    # a span refers to a specific segment of text within a document,
    print('Spans:', document['spans'])

    document_fields = document['fields']
    
    if document_fields.get('W2FormVariant'):
        print('W2-Form:', document_fields['W2FormVariant']['content'])

    if document_fields.get('TaxYear'):
        print('Tax Year:', document_fields['TaxYear']['content'])
    
    if document_fields.get('W2Copy'):
        print('W2 Copy:', document_fields['W2Copy']['content'])

    if document_fields.get('Employer'):
        print('Full Address:', document_fields['Employer']['valueObject']['Address']['content'])
        print('Employer ID:', document_fields['Employer']['valueObject']['IdNumber'].get('valueString', ''))
        print('Employer:', document_fields['Employer']['valueObject']['Name'].get('valueString', ''))
        print('Address:', document_fields['Employer']['valueObject']['Address']['valueAddress'].get('streetAddress', ''))
        print('City:', document_fields['Employer']['valueObject']['Address']['valueAddress'].get('city', ''))
        print('State:', document_fields['Employer']['valueObject']['Address']['valueAddress'].get('state', ''))
        print('Postal Code:', document_fields['Employer']['valueObject']['Address']['valueAddress'].get('postalCode', ''))
        
    if document_fields.get('ControlNumber'):
        print('Control Number:', document_fields['ControlNumber']['content'])
        print('Confidence:', document_fields['ControlNumber']['confidence'] * 100.0, '%')

    if document_fields.get('Employee'):
        print('Employee:', document_fields['Employee']['valueObject']['Name']['content'])
        print('Address:', document_fields['Employee']['valueObject']['Address']['valueAddress'].get('streetAddress', ''))
        print('Confidence:', document_fields['Employee']['valueObject']['Address']['confidence'] * 100.0, '%')

    if document_fields.get('WagesTipsAndOtherCompensation'):
        print('Box1:', document_fields['WagesTipsAndOtherCompensation']['content'])
        print('Confidence:', document_fields['WagesTipsAndOtherCompensation']['confidence'] * 100.0, '%')

    if document_fields.get('FederalIncomeTaxWithheld'):
        print('Box2:', document_fields['FederalIncomeTaxWithheld']['content'])
        print('Confidence:', document_fields['FederalIncomeTaxWithheld']['confidence'] * 100.0, '%')

    if document_fields.get('SocialSecurityWages'):
        print('Box3:', document_fields['SocialSecurityWages']['content'])
        print('Confidence:', document_fields['SocialSecurityWages']['confidence'] * 100.0, '%')

    if document_fields.get('SocialSecurityTaxWithheld'):
        print('Box4:', document_fields['SocialSecurityTaxWithheld']['content'])
        print('Confidence:', document_fields['SocialSecurityTaxWithheld']['confidence'] * 100.0, '%')
    
    if document_fields.get('MedicareWagesAndTips'):
        print('Box5:', document_fields['MedicareWagesAndTips']['content'])
        print('Confidence:', document_fields['MedicareWagesAndTips']['confidence'] * 100.0, '%')

    if document_fields.get('MedicareTaxWithheld'):
        print('Box6:', document_fields['MedicareTaxWithheld']['content'])
        print('Confidence:', document_fields['MedicareTaxWithheld']['confidence'] * 100.0, '%')

    if document_fields.get('SocialSecurityTips'):
        print('Box7:', document_fields['SocialSecurityTips']['content'])
        print('Confidence:', document_fields['SocialSecurityTips']['confidence'] * 100.0, '%')
    
    if document_fields.get('AllocatedTips'):
        print('Box8:', document_fields['AllocatedTips']['content'])
        print('Confidence:', document_fields['AllocatedTips']['confidence'] * 100.0, '%')
    
    if document_fields.get('DependentCareBenefits'):
        print('Box10:', document_fields['DependentCareBenefits']['content'])
        print('Confidence:', document_fields['DependentCareBenefits']['confidence'] * 100.0, '%')
    
    if document_fields.get('NonQualifiedPlans'):
        print('Box11:', document_fields['NonQualifiedPlans']['content'])
        print('Confidence:', document_fields['NonQualifiedPlans']['confidence'] * 100.0, '%')

    # box 12a - 12d
    abcd = ['12a', '12b', '12c', '12d']
    if document_fields.get('AdditionalInfo'):
        for indx, value_field in enumerate(document_fields['AdditionalInfo']['valueArray']):
            if 'LetterCode' in value_field['valueObject']:
                print(f'Box{abcd[indx]}', 'Letter:', value_field['valueObject']['LetterCode']['valueString'])
            else:
                print(f'Box{abcd[indx]}', None)

            if 'Amount' in value_field['valueObject']:
                print(f'Box{abcd[indx]}', 'Amount:', value_field['valueObject']['Amount']['content'])

    if document_fields.get('IsStatutoryEmployee'):
        print('Box13 Statutory Employee:', document_fields['IsStatutoryEmployee']['valueString'])
      
    if document_fields.get('IsRetirementPlan'):
        print('Box13 Retire Plan:', document_fields['IsRetirementPlan']['valueString'])
      
    if document_fields.get('IsThirdPartySickPay'):
        print('Box13 Third-Party Sick Pay:', document_fields['IsThirdPartySickPay']['valueString'])

    if document_fields.get('StateTaxInfos'):
        for indx, value_field in enumerate(document_fields['StateTaxInfos']['valueArray']):
            print(f'state_{indx+1}_Box15a:', value_field['valueObject']['State']['content'])
            print(f'state_{indx+1}_Box15b:', value_field['valueObject']['EmployerStateIdNumber']['content'])
            print(f'state_{indx+1}_Box16:', value_field['valueObject']['StateWagesTipsEtc']['content'])
            print(f'state_{indx+1}_Box17:', value_field['valueObject']['StateIncomeTax']['content'])

    if document_fields.get('LocalTaxInfos'):
        for indx, value_field in enumerate(document_fields['LocalTaxInfos']['valueArray']):
            print(f'local_{indx+1}_Box18:', value_field['valueObject']['LocalWagesTipsEtc']['content'])
            print(f'local_{indx+1}_Box19:', value_field['valueObject']['LocalIncomeTax']['content'])
            print(f'local_{indx+1}_Box20:', value_field['valueObject']['LocalityName']['content'])

    if document_fields.get('Other'):
        print('Box14:', document_fields['Other']['content'])
        print('Confidence:', document_fields['Other']['confidence'] * 100.0, '%')

    print('-----------------------------------')



example2 (Invoice extraction)
"""
Example 2. Extract invoice detail
"""
from pathlib import Path
from azure.ai.documentintelligence.models import AnalyzeDocumentRequest
from utility import client, load_file_as_base64

document_dir = Path('./documents')
invoice_dir = document_dir / 'invoice'
file_path = invoice_dir / 'TC-0964-21.pdf'

if not file_path.exists():
    raise FileNotFoundError(f'File {file_path} not found')

model_id = 'prebuilt-invoice'

doc_source = file_path

document_ai_client = client()

file_base64 = load_file_as_base64(doc_source)
poller = document_ai_client.begin_analyze_document(
    model_id, 
    {"base64Source": file_base64},
    locale="en-US",
)

result = poller.result()

print('Document count :', len(result.documents))

for document in result.documents:
    # print('Doc type:', document['docType'])
    # print('Bounding Area:', document['boundingRegions'])
    # print('Confidence:', document['confidence'] * 100.0, '%')
    
    document_fields = document['fields']
    fields = document_fields.keys()
    print(fields)

    for field in fields:
        if field == 'Items':
            items_list = []
            items = document_fields[field]

            for item in items['valueArray']:
                item_fields = item['valueObject']
                item_dict = {}
                for item_field in item_fields.keys():
                    value = item_fields[item_field].get('content', '')
                    item_dict[item_field] = value
                items_list.append(item_dict)
            print(items_list)
            print('---')
            continue
        value = document_fields[field].get('content', '')
        print(f'{field} : {value}')
        print('---')




example 3. Table extraction
from pathlib import Path
import pandas as pd
from azure.ai.documentintelligence.models import AnalyzeDocumentRequest
from utility import client, load_file_as_base64

document_dir = Path('./documents')

file_path = document_dir / 'citi_bank_statement.pdf'

if not file_path.exists():
    raise FileNotFoundError(f'File {file_path} not found')

model_id = 'prebuilt-layout'

doc_source = file_path

document_ai_client = client()

file_base64 = load_file_as_base64(doc_source)
poller = document_ai_client.begin_analyze_document(
    model_id, 
    {"base64Source": file_base64},
    locale="en-US",
)

result = poller.result()
# result.keys()
# dict_keys(['apiVersion', 'modelId', 'stringIndexType', 'content', 'pages', 'tables', 'paragraphs', 'styles', 'contentFormat', 'sections', 'figures'])

print('Number of tables:', len(result.tables))
tables = []
if result.tables:
    for table in result.tables:
        data = []
        for row_idx in range(table.row_count):
            row_data = []
            for column_idx in range(table.column_count):
                cell = [cell for cell in table.cells if cell.row_index == row_idx and cell.column_index == column_idx]
                if cell:
                    row_data.append(cell[0].content)
                else:
                    row_data.append(None)
            data.append(row_data)
        df = pd.DataFrame(data[1:], columns=data[0])
        tables.append(df)

for indx, talbe in enumerate(tables):
    print(f'Table {indx+1}')
    print(talbe)
    print('\n\n')        

 

utility.py

import os
import configparser
import base64
from urllib.parse import urlparse
from azure.core.credentials import AzureKeyCredential
from azure.ai.documentintelligence import DocumentIntelligenceClient

def client():
    config = configparser.ConfigParser()

    config.read('client.ini')
    api_key = config.get('DocumentAI', 'api_key')
    # an endpoint is a URL at which a web service can be accessed by a client application.
    endpoint = config.get('DocumentAI', 'endpoint')

    client = DocumentIntelligenceClient(endpoint=endpoint, credential=AzureKeyCredential(api_key))
    return client

def is_file_or_url(input_string):
    if os.path.isfile(input_string):
        return 'file'
    elif urlparse(input_string).scheme in ['http', 'https']:
        return 'url'
    else:
        return 'unknown'
    
def load_file_as_base64(file_path):
    with open(file_path, "rb") as f:
        data = f.read()

    base64_bytes = base64.b64encode(data)
    base64_string = base64_bytes.decode('utf-8')
    return base64_string

if __name__ == "__main__":
    client = client()
    print(client)