kodexa.pipeline

A Pipeline is a way to bring together a Connector, set of steps and then a sink to perform data cleansing, normalization, analysis and more.

Submodules

Package Contents

Classes

PipelineContext

Pipeline context is created when you create a pipeline and it provides a way to access information about the

Pipeline

A pipeline represents a way to bring together parts of the kodexa framework to solve a specific problem.

PipelineStatistics

A set of statistics for the processed document

LabelStep

A simple step for handling the labelling for a document

Functions

new_id()

kodexa.pipeline.new_id()
class kodexa.pipeline.PipelineContext(content_provider=None, existing_content_objects=None, context=None, execution_id=None, status_handler=None, cancellation_handler=None)

Pipeline context is created when you create a pipeline and it provides a way to access information about the pipeline that is running. It can be made available to steps/functions so they can interact with it.

It also provides access to the ‘stores’ that have been added to the pipeline

Args:

Returns:

update_status(status_message: str, status_full_message: Optional[str] = None)
is_cancelled() bool
get_context() Dict
get_content_objects() List[kodexa.model.ContentObject]
get_content(content_object: kodexa.model.ContentObject)
Parameters

content_object – ContentObject:

Returns:

put_content(content_object: kodexa.model.ContentObject, content)
Parameters
  • content_object – ContentObject:

  • content

Returns:

set_current_document(current_document: kodexa.model.Document)

Set the Document that is currently being processed in the pipeline

Parameters
  • current_document – The current document

  • current_document – Document:

Returns:

get_current_document() kodexa.model.Document

Get the current document that is being processed in the pipeline

Returns

The current document, or None

Args:

Returns:

set_output_document(output_document: kodexa.model.Document)

Set the output document from the pipeline

Parameters
  • output_document – the final output document from the pipeline

  • output_document – Document:

Returns

the final output document

class kodexa.pipeline.Pipeline(connector=None, name: str = 'Default', stop_on_exception: bool = True, logging_level=logger.info, apply_lineage: bool = True)

A pipeline represents a way to bring together parts of the kodexa framework to solve a specific problem.

When you create a Pipeline you must provide the connector that will be used to source the documents.

Parameters
  • connector – the connector that will be the starting point for the pipeline

  • name – the name of the pipeline (default ‘Default’)

  • stop_on_exception – Should the pipeline raise exceptions and stop (default True)

  • logging_level – The logging level of the pipeline (default INFO)

Returns:

>>> pipeline = Pipeline(FolderConnector(path='/tmp/', file_filter='example.pdf'))
context :PipelineContext
add_label(label: str, options=None, attach_source=False)

Adds a label to the document

Parameters
  • label – label to add

  • options – options to be passed to the step if it is a simplified remote action (Default value = None)

  • attach_source – if step is simplified remote action this determines if we need to add the source (Default value = False)

  • label – str:

Returns

the pipeline

remove_label(label: str, options=None, attach_source=False)

Adds a label to the document

Parameters
  • label – label to remove

  • options – options to be passed to the step if it is a simplified remote action (Default value = None)

  • attach_source – if step is simplified remote action this determines if we need to add the source (Default value = False)

  • label – str: the label to add

Returns

the pipeline

add_step(step, name=None, options=None, attach_source=False, step_type='ACTION')

Add the given step to the current pipeline

Note that it is also possible to add a function as a step, for example

If you are using remote actions on a server, or for deployment to a remote pipeline you can also use a shorthand

Parameters
  • step – the step to add

  • name – the name to use to describe the step (default None)

  • options – options to be passed to the step if it is a simplified remote action (Default value = None)

  • attach_source – if step is simplified remote action this determines if we need to add the source (Default value = False)

  • step_type – the type of step to add, can either be an ACTION or MODEL

Returns

the instance of the pipeline

>>> pipeline = Pipeline(FolderConnector(path='/tmp/', file_filter='example.pdf'))
    >>> pipeline.add_step(ExampleStep())
>>> def my_function(doc):
>>>      doc.metadata.fishstick = 'foo'
>>>      return doc
>>> pipeline.add_step(my_function)
>>> pipeline.add_step('kodexa/html-parser',options={'summarize':False})
to_yaml()

Will return the YAML representation of any actions that support conversion to YAML

The YAML representation for RemoteAction’s can be used for metadata only pipelines in the Kodexa Platform

Returns

YAML representation

Args:

Returns:

run(parameters=None)

Run the current pipeline

Returns

The context from the run

Parameters

parameters – (Default value = None)

Returns:

>>> pipeline = Pipeline(FolderConnector(path='/tmp/', file_filter='example.pdf'))
    >>> pipeline.run()
static from_url(url, headers=None, *args, **kwargs)

Build a new pipeline with the input being a document created from the given URL

Parameters
  • url – The URL ie. https://www.google.com

  • headers – A dictionary of headers (Default value = None)

  • *args

  • **kwargs

Returns

A new instance of a pipeline

static from_file(file_path: str, *args, **kwargs) Pipeline

Create a new pipeline using a file path as a source

Parameters
  • file_path – The path to the file

  • file_path – str:

  • *args

  • **kwargs

Returns

A new pipeline

Return type

Pipeline

static from_text(text: str, *args, **kwargs) Pipeline

Build a new pipeline and provide text as the basic to create a document

Parameters
  • text – Text to use to create document

  • text – str:

  • *args

  • **kwargs

Returns

A new pipeline

Return type

Pipeline

static from_folder(folder_path: str, filename_filter: str = '*', recursive: bool = False, relative: bool = False, unpack=False, caller_path: str = get_caller_dir(), *args, **kwargs) Pipeline

Create a pipeline that will run against a set of local files from a folder

Parameters
  • folder_path – The folder path

  • filename_filter – The filter for filename (i.e. *.pdf)

  • recursive – Should we look recursively in sub-directories (default False)

  • relative – Is the folder path relative to the caller (default False)

  • caller_path – The caller path (defaults to trying to work this out from the stack)

  • unpack – Treat the files in the folder as KDXA documents and unpack them using from_kdxa (default False)

  • folder_path – str:

  • filename_filter – str: (Default value = “*”)

  • recursive – bool: (Default value = False)

  • relative – bool: (Default value = False)

  • caller_path – str: (Default value = get_caller_dir())

  • *args

  • **kwargs

Returns

A new pipeline

Return type

Pipeline

class kodexa.pipeline.PipelineStatistics

A set of statistics for the processed document

documents_processed document_exceptions

Args:

Returns:

processed_document(document)

Update statistics based on this document completing processing

Parameters

document – the document that has been processed

Returns:

class kodexa.pipeline.LabelStep(label: str, remove=False)

Bases: object

A simple step for handling the labelling for a document

process(document: kodexa.model.Document)
Parameters

document – Document:

Returns: