Python xml parser code ScheduleDirect.py

Discussion about Schedules Direct grabber code and data formats.
Post Reply
kmedcalf
Posts: 7
Joined: Sat Sep 13, 2008 7:01 am

Python xml parser code ScheduleDirect.py

Post by kmedcalf »

The attached code is sample Python code which uses a very simple expat parser to convert the ddata.xml file as returned by TMSWebService.py (the raw TMS XML Data) into a stream of record tuples. There is minimal error checking or processing of the raw XML data other than the conversion of all "times" into Unix Epoch time values (as in from time.time()), and all intervals/durations into seconds. There is some rolling of child nodes into parents and renaming of attributes so that a more reasonable record stream is returned -- one which can be easily normalized for storage in a relational database. This code is released into the Public Domain and you are free to use it or modify it without restriction.

Code: Select all

# This module parses using a very simple expat parser the ddata.xml file returned by
# the TMSWebService.py sample program.  It is a "Record Generator" that yields tuples
# of (recordName, progress, attributeDictionary).  All time values are converted to
# Unix Epoch times (as returned by time.time()), and all intervals are in seconds.
# Some minor roll-up of the raw data is performed here, and minor renaming of some of
# the attributes designed to make the returned record stream easier to process.
#
# Error checking is mostly non-existant, however, it does demonstrate a simple xml.expat parser
# returning data records that can be further processed to build a relational database from the
# data.
#
# The code is released into the Public Domain.
# If you break it, you own both halves.
# You are free to use or modify this code.
#
# Original Code by Keith Medcalf, kmedcalf@dessus.com
# Released to SchedulesDirect on 21 January 2012

import codecs
import calendar
import encodings
import gzip
import os
import Queue
import threading
import xml.parsers.expat


elementStack = []
recordQueue = None
fHandle = None
fSize = None


class element(object):
    def __init__(self, name=u'', attrs=None):
        self.name = name
        self.text = ['']
        self.attr = attrs


def start_element(name, attrs):
    elementStack.append(element(name, attrs))


def end_element(name):
    assert elementStack[-1].name == name
    top = elementStack[-1]
    try:
        next = elementStack[-2]
        if len(top.attr) == 0:
            data = ''.join(top.text)
            if len(data) > 0:
                next.attr[top.name] = data
        else:
            if top.name == 'part' and next.name == 'schedule':
                next.attr['part'] = int(top.attr['number'])
                next.attr['parts'] = int(top.attr['total'])
            elif top.name == 'member' and next.name == 'crew':
                top.attr.update(next.attr)
                processRecord(next.name, top.attr)
            elif top.name == 'map' and next.name == 'lineup':
                top.attr['lineup'] = next.attr['id']
                processRecord(top.name, top.attr)
            elif top.name == 'genre' and next.name == 'programGenre':
                top.attr.update(next.attr)
                processRecord(top.name, top.attr)
            elif top.name == 'advisories' and next.name == 'program':
                top.attr['program'] = next.attr['id']
                processRecord(top.name, top.attr)
            elif top.name in ['programGenre', 'crew', 'givenname', 'xtvd', 'xtvdResponse', 'ns1:downloadResponse', 'SOAP-ENV:Envelope']:
                pass
            else:
                processRecord(top.name, top.attr)
    except:
        pass
    del elementStack[-1]


def char_data(data):
    if data != '\n':
        elementStack[-1].text.append(data)


def dString2stamp(tstring):
    hh, mm = map(int, tstring.strip('PTM').split('H'))
    return ((hh * 60) + mm) * 60


def tString2stamp(tstring):
    if tstring[-1] == 'Z':
        tstring = tstring[:-1]
    datepart = '1980-01-01'
    timepart = '00:00:00'
    if 'T' in tstring:
        datepart, timepart = tstring.split('T')
    elif ' ' in tstring:
        datepart, timepart = tstring.split(' ')
    else:
        datepart = tstring
    yy, mm, dd = map(int, datepart.split('-'))
    hr, mi, se = map(int, timepart.split(':'))
    return calendar.timegm((yy, mm, dd, hr, mi, se))


def fixtitle(ts):
    if ts[:2].lower() == 'a ':
        return (ts[2:] + ', ' + ts[:2]).strip()
    if ts[:4].lower() == 'the ':
        return (ts[4:] + ', ' + ts[:4]).strip()
    return ts


def processRecord(name, attr):
    global fHandle
    if ('id' in attr) and not (name in attr):
        attr[name] = attr['id']
        del attr['id']
    if name == 'map':
        if 'from' in attr:
            attr['validfrom'] = attr['from']
            del attr['from']
        if 'to' in attr:
            attr['validto'] = attr['to']
            del attr['to']
        if not 'validfrom' in attr:
            attr['validfrom'] = '1980-01-01'
        if not 'validto' in attr:
            attr['validto'] = '2037-12-31'
        attr['validfrom'] = tString2stamp(attr['validfrom'])
        attr['validto'] = tString2stamp(attr['validto'])
    elif name == 'schedule':
        attr['time'] = tString2stamp(attr['time'])
        attr['duration'] = dString2stamp(attr['duration'])
        attr['endtime'] = attr['time'] + attr['duration']
        for field in ['stereo', 'ei', 'subtitled', 'hdtv', 'new', 'dolby', 'closeCaptioned']:
            if field in attr and (attr[field].lower() == 'true' or attr[field].lower() == field.lower()):
                attr[field] = 1
    elif name == 'program':
        if 'runTime' in attr:
            attr['runTime'] = dString2stamp(attr['runTime'])
        if 'title' in attr:
            attr['title'] = fixtitle(attr['title'])
        if 'subtitle' in attr:
            attr['subtitle'] = fixtitle(attr['subtitle'])
    recordQueue.put((name, float(fHandle.tell()) / fSize * 100.0, attr))


def parseScheduleDirect(filename):
    global recordQueue, fSize, fHandle
    p = xml.parsers.expat.ParserCreate()
    p.StartElementHandler = start_element
    p.EndElementHandler = end_element
    p.CharacterDataHandler = char_data
    if filename.lower().endswith('.gz'):
        f = gzip.open(filename, 'rb')
    else:
        f = open(filename, 'rb')
    f.seek(0, 2)
    fSize = float(f.tell())
    f.seek(0, 0)
    fHandle = f
    p.ParseFile(f)
    fHandle = None
    f.close()
    recordQueue.put(None)


def XMLDataFileParser(filename='ddata.xml'):
    global recordQueue
    recordQueue = Queue.Queue(1024)
    t = threading.Thread(target=parseScheduleDirect, args=(filename,))
    t.setDaemon(True)
    t.start()
    while True:
        record = recordQueue.get()
        if record is None:
            return
        yield record

if __name__ == '__main__':
    print 'Test ddata.xml decoding'
    for name, pct, attr in XMLDataFileParser('ddata.xml'):
        print '%-11s:' % (name, ), attr

kmedcalf
Posts: 7
Joined: Sat Sep 13, 2008 7:01 am

Re: Python xml parser code ScheduleDirect.py

Post by kmedcalf »

Advisories Records now stacked and output after the Program

Code: Select all

# This module parses using a very simple expat parser the ddata.xml file returned by
# the TMSWebService.py sample program.  It is a "Record Generator" that yeilds tuples
# of (recordName, progress, attributeDictionary).  All time values are converted to
# Unix Epoch times (as returned by time.time()), and all intervals are in seconds.
# Some minor roll-up of the raw data is performed here, and minor renaming of some of
# the attributes designed to make the returned record stream easier to process.
#
# Error checking is mostly non-existant, however, it does demonstrate a simple xml.expat parser
# returning data records that can be further processed to build a relational database from the
# data.
#
# Code which will build a relational database in SQLite from the returned data stream will follow.
#
# The code is released into the Public Domain.
# If you break it, you own both halves.
# You are free to use or modify this code.
#
# Original Code by Keith Medcalf, kmedcalf@dessus.com
# Released to SchedulesDirect on 21 January 2012

import codecs
import calendar
import encodings
import gzip
import os
import Queue
import threading
import xml.parsers.expat


elementStack = []
advisoryStack = []
recordQueue = None
fHandle = None
fSize = None


class element(object):
    def __init__(self, name=u'', attrs=None):
        self.name = name
        self.text = ['']
        self.attr = attrs


def start_element(name, attrs):
    elementStack.append(element(name, attrs))


def end_element(name):
    global advisoryStack
    assert elementStack[-1].name == name
    top = elementStack[-1]
    try:
        next = elementStack[-2]
        if len(top.attr) == 0:
            data = ''.join(top.text)
            if len(data) > 0:
                next.attr[top.name] = data
        else:
            if top.name == 'part' and next.name == 'schedule':
                next.attr['part'] = int(top.attr['number'])
                next.attr['parts'] = int(top.attr['total'])
            elif top.name == 'member' and next.name == 'crew':
                top.attr.update(next.attr)
                processRecord(next.name, top.attr)
            elif top.name == 'map' and next.name == 'lineup':
                top.attr['lineup'] = next.attr['id']
                processRecord(top.name, top.attr)
            elif top.name == 'genre' and next.name == 'programGenre':
                top.attr.update(next.attr)
                processRecord(top.name, top.attr)
            elif top.name in ['advisories', 'programGenre', 'crew', 'givenname', 'xtvd', 'xtvdResponse', 'ns1:downloadResponse', 'SOAP-ENV:Envelope']:
                pass
            else:
                processRecord(top.name, top.attr)
        if top.name == 'advisory':
            next.attr['program'] = elementStack[-3].attr['id']
            advisoryStack.append((next.name, next.attr.copy()))
    except:
        pass
    del elementStack[-1]


def char_data(data):
    if data != '\n':
        elementStack[-1].text.append(data)


def dString2stamp(tstring):
    hh, mm = map(int, tstring.strip('PTM').split('H'))
    return ((hh * 60) + mm) * 60


def tString2stamp(tstring):
    if tstring[-1] == 'Z':
        tstring = tstring[:-1]
    datepart = '1980-01-01'
    timepart = '00:00:00'
    if 'T' in tstring:
        datepart, timepart = tstring.split('T')
    elif ' ' in tstring:
        datepart, timepart = tstring.split(' ')
    else:
        datepart = tstring
    yy, mm, dd = map(int, datepart.split('-'))
    hr, mi, se = map(int, timepart.split(':'))
    return calendar.timegm((yy, mm, dd, hr, mi, se))


def fixtitle(ts):
    if ts[:2].lower() == 'a ':
        return (ts[2:] + ', ' + ts[:2]).strip()
    if ts[:4].lower() == 'the ':
        return (ts[4:] + ', ' + ts[:4]).strip()
    return ts


def processRecord(name, attr):
    global fHandle, advisoryStack
    if ('id' in attr) and not (name in attr):
        attr[name] = attr['id']
        del attr['id']
    if name == 'map':
        if 'from' in attr:
            attr['validfrom'] = attr['from']
            del attr['from']
        if 'to' in attr:
            attr['validto'] = attr['to']
            del attr['to']
        if not 'validfrom' in attr:
            attr['validfrom'] = '1980-01-01'
        if not 'validto' in attr:
            attr['validto'] = '2037-12-31'
        attr['validfrom'] = tString2stamp(attr['validfrom'])
        attr['validto'] = tString2stamp(attr['validto'])
    elif name == 'schedule':
        attr['time'] = tString2stamp(attr['time'])
        attr['duration'] = dString2stamp(attr['duration'])
        attr['endtime'] = attr['time'] + attr['duration']
        for field in ['stereo', 'ei', 'subtitled', 'hdtv', 'new', 'dolby', 'closeCaptioned']:
            if field in attr and (attr[field].lower() == 'true' or attr[field].lower() == field.lower()):
                attr[field] = 1
    elif name == 'program':
        if 'runTime' in attr:
            attr['runTime'] = dString2stamp(attr['runTime'])
        if 'title' in attr:
            attr['title'] = fixtitle(attr['title'])
        if 'subtitle' in attr:
            attr['subtitle'] = fixtitle(attr['subtitle'])
    recordQueue.put((name, float(fHandle.tell()) / fSize * 100.0, attr))
    if name == 'program':
        for name, attr in advisoryStack:
            recordQueue.put((name, float(fHandle.tell()) / fSize * 100.0, attr))
        advisoryStack = []


def parseScheduleDirect(filename):
    global recordQueue, fSize, fHandle
    p = xml.parsers.expat.ParserCreate()
    p.StartElementHandler = start_element
    p.EndElementHandler = end_element
    p.CharacterDataHandler = char_data
    if filename.lower().endswith('.gz'):
        f = gzip.open(filename, 'rb')
    else:
        f = open(filename, 'rb')
    f.seek(0, 2)
    fSize = float(f.tell())
    f.seek(0, 0)
    fHandle = f
    p.ParseFile(f)
    fHandle = None
    f.close()
    recordQueue.put(None)


def XMLDataFileParser(filename='ddata.xml'):
    global recordQueue
    recordQueue = Queue.Queue(1024)
    t = threading.Thread(target=parseScheduleDirect, args=(filename,))
    t.setDaemon(True)
    t.start()
    while True:
        record = recordQueue.get()
        if record is None:
            return
        yield record

if __name__ == '__main__':
    print 'Test ddata.xml decoding'
    for name, pct, attr in XMLDataFileParser('ddata.xml'):
        print '%-11s:' % (name, ), attr


Post Reply