The WriteCSVFileState TemplateΒΆ

WriteCSVFileState

smacha/src/smacha/templates/WriteCSVFileState.tpl.py

{% block meta %}
name: WriteCSVFileState
description:
  SMACH state that writes a comma-separated values (CSV) file to the file system
  based on userdata input.
language: Python
framework: SMACH
type: State
tags: [core]
includes:
extends:
- State
variables:
- input_keys:
    description:
        The userdata input keys, e.g. the data_keys to be written to file.
    type: list of str
- - output_keys:
      description:
          The userdata output keys, e.g. needed by the optionally specified
          callbacks.
      type: list of str
- - callbacks:
      description:
        Either callback function names or backtick-wrapped lambda functions for,
        e.g. modifying the data_keys.
      type: dict of str
- - delimiter:
      description:
        The symbol to be used to separate data columns in the CSV file.
        Defaults to `,`.
      type: list of str
input_keys:
- file:
    description:
      The CSV file path.
    type: str
- data_keys:
    description:
      The names of the input_keys to be written to file.
    type: list of str
output_keys: []
outcomes:
- succeeded
- aborted
{% endblock meta %}

{% from "Utils.tpl.py" import import_module, from_import, render_init_callbacks, render_execute_callbacks, render_callbacks, render_input_keys, render_output_keys, render_transitions, render_remapping %}

{% extends "State.tpl.py" %}

{% block imports %}
{{ super() }}
{{ import_module(defined_headers, 'csv') }}
{% endblock imports %}

{% block class_defs %}
{{ super() }}
{% if 'class_WriteCSVFileState' not in defined_headers %}
class WriteCSVFileState(smach.State):
    """
    This state formats input keys specified by the 'data_keys' input key and writes
    them to a CSV file specified by the 'file' input key.
    """
    def __init__(self, delimiter=',', input_keys = ['file', 'data_keys'], output_keys = [], callbacks=None):
        smach.State.__init__(self, outcomes=['succeeded', 'aborted'], input_keys=input_keys, output_keys=output_keys)

        # Save delimiter
        self._delimiter = delimiter

        # Default to ignoring header slots in ROS message types
        # NOTE: This refers to the ROS msg header, NOT the CSV header.
        self._ignore = ['header']

        {{ render_init_callbacks() }}

    def _parse_data_key(self, header, data, data_key, name='', ignore=[]):
        """
        Recursively parse a data key (e.g. a ROS msg type) and return a list of CSV header names and CSV data entries.
        """
        try:
            for slot in data_key.__slots__:
                if slot in ignore:
                    continue
                try:
                    slot_header, slot_data = self._parse_data_key(header, data, getattr(data_key, slot),
                                                                  name=name + '_' + slot, ignore=ignore)
                except:
                    continue
            return header, data
        except:
            try:
                header.append(name)
                data.append(data_key)
                return header, data
            except:
                raise

    def execute(self, userdata):

        {{ render_execute_callbacks() }}

        # Set up ignore slots, if they have been defined by the user
        if 'ignore' in self._input_keys:
            self._ignore = userdata.ignore

        # Set up header
        header = []

        # Set up data
        data = []

        # Try treating data_keys as a singleton ROS type with an iterable slot  
        try:
            # Set up CSV header
            if 'header' in self._input_keys:
                header = userdata.header
                user_defined_header = True
            else:
                header = []
                user_defined_header = False

            # Ensure data_keys contains a single entry
            assert len(userdata.data_keys == 1)
            data_key = userdata.data_keys[0]

            # Look for an iterable slot in the data key
            for slot in data_key.__slots__:
                # Ignore any slots that should be ignored
                if slot in self._ignore:
                    pass
                else:
                    try:
                        for slot_iter_data_key in getattr(data_key, slot):
                            try:
                                if user_defined_header:
                                    _, slot_iter_data_key_data = (
                                            self._parse_data_key([], [], slot_iter_data_key,
                                                                 name=slot, ignore=self._ignore))
                                else:
                                    header, slot_iter_data_key_data = (
                                            self._parse_data_key([], [], slot_iter_data_key,
                                                                 name=slot, ignore=self._ignore))
                                data.append(slot_iter_data_key_data)
                            except:
                                break
                        if data:
                            break
                    except:
                        pass
            assert data
        except:
            # Try iterating through data_keys, using the slots from each item
            # as data entries.
            try:
                # Set up CSV header
                if 'header' in self._input_keys:
                    header = userdata.header
                    user_defined_header = True
                else:
                    header = []
                    user_defined_header = False

                for data_key in userdata.data_keys:
                    # If there is a user-defined header, just parse the data for the data_key entry.
                    if user_defined_header:
                        _, data_key_data = self._parse_data_key([], [], getattr(userdata, data_key),
                                                                name=data_key, ignore=self._ignore)
                    # If not, parse both the data and the header for the data_key entry.
                    else:
                        data_key_header, data_key_data = self._parse_data_key([], [], getattr(userdata, data_key),
                                                                              name=data_key, ignore=self._ignore)
                        header = header + data_key_header
                    data = data + data_key_data
                assert data
                # Ensure data is a list of lists, even if it contains just one list,
                # for the benefit of csv.writer.writerows()
                data = [data]
            except Exception as e:
                smach.logwarn('Failed to parse data keys for CSV file writing: {}'.format(repr(e)))
                return 'aborted'

        # Write to CSV file
        try:
            with open(userdata.file, 'w') as fp:
                writer = csv.writer(fp, delimiter=self._delimiter)
                if header:
                    writer.writerow(header)
                writer.writerows(data)
        except Exception as e:
            smach.logwarn('Failed to write to CSV file \'{}\': {}'.format(userdata.file, repr(e)))
            return 'aborted'

        # Write the header as output key if required.
        if 'header' in self._output_keys:
            userdata.header = header

        return 'succeeded'
{% do defined_headers.append('class_WriteCSVFileState') %}{% endif %}
{% endblock class_defs %}

{% block body %}
smach.{{ parent_type }}.add('{{ name }}',
{{ '' | indent(23, true) }}WriteCSVFileState({% if delimiter is defined %}delimiter='{{ delimiter }}'{% endif %}{% if input_keys is defined %}{% if delimiter is defined %}, {% endif %}{{ render_input_keys(input_keys, indent=0) }}{% endif %}{% if output_keys is defined %}{% if input_keys is defined or delimiter is defined %}, {% endif %}{{ render_output_keys(output_keys, indent=0) }}{% endif %}{% if callbacks is defined %}, {{ render_callbacks(name, uuid, callbacks, indent=0) }}{% endif %}){% if transitions is defined %},
{{ render_transitions(transitions) }}{% endif %}{% if remapping is defined %},
{{ render_remapping(remapping) }}{% endif %})
{% endblock body %}