summaryrefslogtreecommitdiff
path: root/py05
diff options
context:
space:
mode:
Diffstat (limited to 'py05')
-rwxr-xr-xpy05/ex0/data_processor.py236
-rwxr-xr-xpy05/ex1/data_stream.py278
-rwxr-xr-xpy05/ex2/data_pipeline.py338
3 files changed, 852 insertions, 0 deletions
diff --git a/py05/ex0/data_processor.py b/py05/ex0/data_processor.py
new file mode 100755
index 0000000..56166ac
--- /dev/null
+++ b/py05/ex0/data_processor.py
@@ -0,0 +1,236 @@
+#!/usr/bin/env python3
+
+from abc import ABC, abstractmethod
+import typing
+import sys
+
+
+class DataProcessor(ABC):
+ def __init__(self, name: str) -> None:
+ self.storage: typing.List[typing.Any] = []
+ self.rank: int = 0
+ self.counter: int = 0
+ self.name = name
+
+ @abstractmethod
+ def validate(self, data: typing.Any) -> bool:
+ pass
+
+ @abstractmethod
+ def ingest(self, data: typing.Any) -> None:
+ pass
+
+ def output(self) -> tuple[int, str]:
+ # extract the oldest piece of data
+ # piece of data is removed
+ position = self.rank
+ self.rank += 1
+ if isinstance(self.storage[0], dict):
+ entry = self.storage[0]
+ output_tuple: tuple = tuple(entry.values())
+ to_return = (f"{output_tuple[0]}: {output_tuple[1]}")
+ self.storage.pop(0)
+ return position, to_return
+ to_return = self.storage[0]
+ self.storage.pop(0)
+ return position, to_return
+
+
+class NumericProcessor(DataProcessor):
+
+ def validate(self, data: int | float | typing.List[int | float]) -> bool:
+ try:
+ if isinstance(data, int):
+ self.counter += 1
+ return True
+ elif isinstance(data, float):
+ self.counter += 1
+ return True
+ elif isinstance(data, typing.List):
+ type_list = "yes"
+ for item in data:
+ self.counter += 1
+ if not isinstance(item, int | float):
+ type_list = "no"
+ self.counter -= 1
+ if type_list == "yes":
+ return True
+ else:
+ raise ValueError()
+ else:
+ raise ValueError()
+ except ValueError:
+ return False
+
+ def ingest(self, data: int | float | typing.List[int | float]) -> None:
+ try:
+ if isinstance(data, typing.List):
+ for item in data:
+ if isinstance(item, int | float):
+ self.storage.append(item)
+ elif isinstance(data, int | float):
+ self.storage.append(data)
+ else:
+ raise ValueError()
+ except ValueError:
+ print("Improper numeric data", file=sys.stderr)
+
+
+def is_number(s: str) -> bool:
+ try:
+ float(s)
+ return True
+ except (ValueError, TypeError):
+ return False
+
+
+class TextProcessor(DataProcessor):
+
+ def validate(self, data: str | typing.List[str]) -> bool:
+ try:
+ if isinstance(data, str):
+ if is_number(data):
+ raise ValueError()
+ else:
+ self.counter += 1
+ return True
+ elif isinstance(data, typing.List):
+ for item in data:
+ if isinstance(item, str):
+ if is_number(item):
+ raise ValueError()
+ return False
+ elif isinstance(item, dict):
+ raise ValueError()
+ return False
+ else:
+ self.counter += 1
+ if isinstance(item, dict):
+ raise ValueError()
+ return False
+ return True
+ else:
+ raise ValueError()
+ except ValueError:
+ return False
+
+ def ingest(self, data: str | typing.List[str]) -> None:
+ try:
+ if isinstance(data, str):
+ self.storage.append(data)
+ elif isinstance(data, typing.List):
+ for item in data:
+ if is_number(item):
+ raise ValueError()
+ elif isinstance(item, dict):
+ raise ValueError()
+ else:
+ self.storage.append(item)
+ else:
+ raise ValueError()
+ except ValueError:
+ print("Improper text data", file=sys.stderr)
+
+
+class LogProcessor(DataProcessor):
+
+ def validate(self, data: dict | typing.List[dict]) -> bool:
+ try:
+ if isinstance(data, dict):
+ return True
+ if isinstance(data, typing.List):
+ for item in data:
+ if isinstance(item, dict):
+ self.counter += 1
+ else:
+ raise ValueError()
+ return False
+ return True
+ else:
+ raise ValueError()
+ except ValueError:
+ return False
+
+ def ingest(self, data: dict | typing.List[dict]) -> None:
+ try:
+ if isinstance(data, dict):
+ for item in data:
+ self.storage.append(item)
+ elif isinstance(data, typing.List):
+ for item in data:
+ if isinstance(item, dict):
+ self.storage.append(item)
+ else:
+ raise ValueError()
+ except ValueError:
+ print("Improper log data", file=sys.stderr)
+
+
+def testNumericProcessor() -> None:
+ is_num = NumericProcessor("Numeric Processor")
+ test_num: typing.List = [42, 42.0, 'FortyTwo']
+ for item in test_num:
+ print(f"Trying to validate input {item}: ", end='')
+ print(is_num.validate(item))
+ print("Testing invalid ingestion of string 'foo' without prior validation")
+ print("Got exception: ", end='')
+ # flush buffer
+ sys.stdout.flush()
+ # will raise a mypy error
+ is_num.ingest('foo')
+ is_num.ingest([1, 2, 3, 4, 5])
+ print(f"Processing data: {is_num.storage}")
+ print(f"Extracting {len(is_num.storage)} values...")
+ while is_num.storage:
+ position, value = is_num.output()
+ print(f"Numeric value {position} : {value}")
+
+
+def testTextProcessor() -> None:
+ is_text = TextProcessor("Text Processor")
+ # test_text: typing.List = ['42', '42.0', 'FortyTwo', ['More', '43']]
+ test_text: typing.List = ['42', '42.0', 'FortyTwo']
+ for item in test_text:
+ print(f"Trying to validate input {item}: ", end='')
+ print(is_text.validate(item))
+ test_list_text: typing.List = ['Hello', 'Nexus', 'FortyTwo']
+ print(f"Trying to validate '{test_list_text}': "
+ f"{is_text.validate(test_list_text)}")
+ is_text.ingest(test_list_text)
+ print(f"Processing data: {is_text.storage}")
+ print(f"Extracting {len(is_text.storage)} values...")
+ while is_text.storage:
+ position, value = is_text.output()
+ print(f"Text value {position} : {value}")
+
+
+def testLogProcessor() -> None:
+ log = [{'log_level': 'NOTICE',
+ 'log_message': 'Connection to server'},
+ {'log_level': 'ERROR',
+ 'log_message': 'Unauthorized access!!'}]
+ is_log = LogProcessor("Log Processor")
+ # will raise a mypy error
+ print(f"Trying to validate input 'Hello': {is_log.validate('Hello')}")
+ is_log.ingest(log)
+ print(f"Processing data: {is_log.storage}")
+ print(f"Extracting {len(is_log.storage)} values...")
+ while is_log.storage:
+ position, value = is_log.output()
+ print(f"Log entry {position} : {value}")
+
+
+def main() -> None:
+ print("=== Code Nexus - Data Processor ===")
+ print("Testing Numeric Processor...")
+ testNumericProcessor()
+ print()
+ print("Testing Text Processor...")
+ testTextProcessor()
+ print()
+ print("Testing Log Processor...")
+ testLogProcessor()
+
+
+if __name__ == "__main__":
+ main()
diff --git a/py05/ex1/data_stream.py b/py05/ex1/data_stream.py
new file mode 100755
index 0000000..b457a24
--- /dev/null
+++ b/py05/ex1/data_stream.py
@@ -0,0 +1,278 @@
+#!/usr/bin/env python3
+
+from abc import ABC, abstractmethod
+import typing
+import sys
+
+
+class DataProcessor(ABC):
+ def __init__(self, name: str) -> None:
+ self.storage: typing.List[typing.Any] = []
+ self.rank: int = 0
+ self.counter: int = 0
+ self.name = name
+
+ @abstractmethod
+ def validate(self, data: typing.Any) -> bool:
+ pass
+
+ @abstractmethod
+ def ingest(self, data: typing.Any) -> None:
+ pass
+
+ def output(self) -> tuple[int, str]:
+ # extract the oldest piece of data
+ # piece of data is removed
+ position = self.rank
+ self.rank += 1
+ if isinstance(self.storage[0], dict):
+ entry = self.storage[0]
+ output_tuple: tuple = tuple(entry.values())
+ to_return = (f"{output_tuple[0]}: {output_tuple[1]}")
+ self.storage.pop(0)
+ return position, to_return
+ to_return = self.storage[0]
+ self.storage.pop(0)
+ return position, to_return
+
+
+class NumericProcessor(DataProcessor):
+
+ def validate(self, data: int | float | typing.List[int | float]) -> bool:
+ try:
+ if isinstance(data, int):
+ self.counter += 1
+ return True
+ elif isinstance(data, float):
+ self.counter += 1
+ return True
+ elif isinstance(data, typing.List):
+ type_list = "yes"
+ for item in data:
+ self.counter += 1
+ if not isinstance(item, int | float):
+ type_list = "no"
+ self.counter -= 1
+ if type_list == "yes":
+ return True
+ else:
+ raise ValueError()
+ else:
+ raise ValueError()
+ except ValueError:
+ return False
+
+ def ingest(self, data: int | float | typing.List[int | float]) -> None:
+ try:
+ if isinstance(data, typing.List):
+ for item in data:
+ if isinstance(item, int | float):
+ self.storage.append(item)
+ elif isinstance(data, int | float):
+ self.storage.append(data)
+ else:
+ raise ValueError()
+ except ValueError:
+ print("Improper numeric data", file=sys.stderr)
+
+
+def is_number(s: str) -> bool:
+ try:
+ float(s)
+ return True
+ except (ValueError, TypeError):
+ return False
+
+
+class TextProcessor(DataProcessor):
+
+ def validate(self, data: str | typing.List[str]) -> bool:
+ try:
+ if isinstance(data, str):
+ if is_number(data):
+ raise ValueError()
+ else:
+ self.counter += 1
+ return True
+ elif isinstance(data, typing.List):
+ for item in data:
+ if isinstance(item, str):
+ if is_number(item):
+ raise ValueError()
+ return False
+ else:
+ self.counter += 1
+ if isinstance(item, dict):
+ raise ValueError()
+ return False
+ return True
+ else:
+ raise ValueError()
+ except ValueError:
+ return False
+
+ def ingest(self, data: str | typing.List[str]) -> None:
+ try:
+ if isinstance(data, str):
+ self.storage.append(data)
+ elif isinstance(data, typing.List):
+ for item in data:
+ if is_number(item):
+ raise ValueError()
+ elif isinstance(item, dict):
+ raise ValueError()
+ else:
+ self.storage.append(item)
+ else:
+ raise ValueError()
+ except ValueError:
+ print("Improper text data", file=sys.stderr)
+
+
+class LogProcessor(DataProcessor):
+
+ def validate(self, data: dict | typing.List[dict]) -> bool:
+ try:
+ if isinstance(data, dict):
+ return True
+ if isinstance(data, typing.List):
+ for item in data:
+ if isinstance(item, dict):
+ self.counter += 1
+ else:
+ raise ValueError()
+ return False
+ return True
+ else:
+ raise ValueError()
+ except ValueError:
+ return False
+
+ def ingest(self, data: dict | typing.List[dict]) -> None:
+ try:
+ if isinstance(data, dict):
+ for item in data:
+ self.storage.append(item)
+ elif isinstance(data, typing.List):
+ for item in data:
+ if isinstance(item, dict):
+ self.storage.append(item)
+ else:
+ raise ValueError()
+ except ValueError:
+ print("Improper log data", file=sys.stderr)
+
+
+class DataStream():
+ def __init__(self) -> None:
+ self.processor: list = []
+ self.counter: int = 0
+
+ def register_processor(self, proc: DataProcessor) -> None:
+ # method that allows you to register a new data processor to process
+ # the data stream.
+ self.processor.append(proc)
+
+ def process_stream(self, stream: list[typing.Any]) -> None:
+ # method that will analyze each element of the list received as a
+ # parameter and send it to the appropriate registered data processor.
+ # Error messages will be printed if no data processor can handle an
+ # element
+ for item in stream:
+ try:
+ for processor in self.processor:
+ # route towards appropriate processors
+ if processor.validate(item):
+ processor.ingest(item)
+ # stop when appropriate processors is found
+ break
+ else:
+ raise ValueError()
+ except ValueError:
+ print(f"Data Stream Error - Can't process "
+ f"element in stream: {item}", file=sys.stderr)
+
+ def print_processors_stats(self) -> None:
+ if self.processor == []:
+ print("No processor found, no data")
+ else:
+ for processor in self.processor:
+ remaining = len(processor.storage)
+ print(f"{processor.name}: total {processor.counter} items "
+ f"processed, remaining {remaining} on processor")
+
+ def consume_processors(self, rm_1: int, rm_2: int, rm_3: int) -> None:
+ for processor in self.processor:
+ if processor.name == "Numeric Processor":
+ for _ in range(rm_1):
+ processor.output()
+ elif processor.name == "Text Processor":
+ for _ in range(rm_2):
+ processor.output()
+ elif processor.name == "Log Processor":
+ for _ in range(rm_3):
+ processor.output()
+
+
+def main() -> None:
+
+ # Create a test scenario that demonstrates the correct processing of a data
+ # stream. Display statistics on registered data processors, consume
+ # elements using the output method of each data processor and show updated
+ # statistics
+
+ print("=== Code Nexus - Data Stream ===")
+ print()
+ print("Initialize Data Stream...")
+ print()
+ processors = DataStream()
+ print("=== DataStream statistic ===")
+ processors.print_processors_stats()
+ print()
+ # initialize processors
+ is_num = NumericProcessor("Numeric Processor")
+ print("Registering Numeric Processor")
+ print()
+ processors.register_processor(is_num)
+ batch_one = ['Hello world',
+ [3.14, -1, 2.71],
+ [{
+ 'log_level': 'WARNING',
+ ' log_message': 'Telnet access! Use ssh instead'
+ },
+ {
+ 'log_level': 'INFO',
+ 'log_message': 'User wil is connected'
+ }],
+ 42,
+ ['Hi', 'five']]
+ print(f"Sending first batch of data on stream: {batch_one}")
+ processors.process_stream(batch_one)
+ print()
+ print("=== DataStream statistic ===")
+ processors.print_processors_stats()
+ print()
+ print("Registering other data processors")
+ is_text = TextProcessor("Text Processor")
+ is_log = LogProcessor("Log Processor")
+ processors.register_processor(is_text)
+ processors.register_processor(is_log)
+ print()
+ print("Send the same batch again")
+ processors.process_stream(batch_one)
+ print()
+ print("=== DataStream statistic ===")
+ processors.print_processors_stats()
+ print()
+ consume_num = 3
+ consume_text = 2
+ consume_log = 1
+ print(f"Consume some elements from the data processors:"
+ f" Numeric {consume_num}, Text {consume_text}, Log {consume_log}")
+ processors.consume_processors(consume_num, consume_text, consume_log)
+ print("=== DataStream statistic ===")
+ processors.print_processors_stats()
+
+
+if __name__ == "__main__":
+ main()
diff --git a/py05/ex2/data_pipeline.py b/py05/ex2/data_pipeline.py
new file mode 100755
index 0000000..a558ac6
--- /dev/null
+++ b/py05/ex2/data_pipeline.py
@@ -0,0 +1,338 @@
+#!/usr/bin/env python3
+#
+# "Briefing: Your task is to integrate work from past exercices into a complete
+# data processing pipeline that demonstrates understanding of polymorphic
+# concepts. Use your code from Exercise 1 and improve it in order to obtain a
+# complete data pipeline. Your DataStream class already handles input streams.
+# You need now to handle the output part of the pipeline. This will be achieved
+# by using a plugin system for export classes, made export-compatible through
+# duck typing."
+#
+# https://typing.python.org/en/latest/spec/protocol.html
+
+from abc import ABC, abstractmethod
+import typing
+import sys
+
+
+class ExportPlugin(typing.Protocol):
+
+ def process_output(self, data: list[tuple[int, str]]) -> None:
+ # The type of the data parameter is a list of tuples that matches the
+ # return value of the output method from the DataProcessor class.
+ # output CSV or JSON
+ # if list output as CSV
+ ...
+
+
+class ExportCSV(ExportPlugin):
+
+ def process_output(self, data: list[tuple[int, str]]) -> None:
+ i = 0
+ print("CSV Output:")
+ for item in data:
+ print(data[i][1], end='')
+ if item is not data[-1]:
+ print(',', end='')
+ else:
+ print()
+ i += 1
+
+
+class ExportJSON(ExportPlugin):
+
+ def process_output(self, data: list[tuple[int, str]]) -> None:
+ i = 0
+ print("JSON Output:")
+ print("{", end='')
+ for item in data:
+ print(f"\"item_{data[i][0]}\": \"{data[i][1]}\"", end='')
+ if item is not data[-1]:
+ print(', ', end='')
+ else:
+ print(end='')
+ i += 1
+ print("}")
+
+
+class DataProcessor(ABC):
+ def __init__(self, name: str) -> None:
+ self.storage: typing.List[typing.Any] = []
+ self.rank: int = 0
+ self.counter: int = 0
+ self.name = name
+
+ @abstractmethod
+ def validate(self, data: typing.Any) -> bool:
+ pass
+
+ @abstractmethod
+ def ingest(self, data: typing.Any) -> None:
+ pass
+
+ def output(self) -> tuple[int, str]:
+ # extract the oldest piece of data
+ # piece of data is removed
+ position = self.rank
+ self.rank += 1
+ if isinstance(self.storage[0], dict):
+ entry = self.storage[0]
+ output_tuple: tuple = tuple(entry.values())
+ to_return = (f"{output_tuple[0]}: {output_tuple[1]}")
+ self.storage.pop(0)
+ return position, to_return
+ to_return = self.storage[0]
+ self.storage.pop(0)
+ return position, to_return
+
+
+class NumericProcessor(DataProcessor):
+
+ def validate(self, data: int | float | typing.List[int | float]) -> bool:
+ try:
+ if isinstance(data, int):
+ self.counter += 1
+ return True
+ elif isinstance(data, float):
+ self.counter += 1
+ return True
+ elif isinstance(data, typing.List):
+ type_list = "yes"
+ for item in data:
+ self.counter += 1
+ if not isinstance(item, int | float):
+ type_list = "no"
+ self.counter -= 1
+ if type_list == "yes":
+ return True
+ else:
+ raise ValueError()
+ else:
+ raise ValueError()
+ except ValueError:
+ return False
+
+ def ingest(self, data: int | float | typing.List[int | float]) -> None:
+ try:
+ if isinstance(data, typing.List):
+ for item in data:
+ if isinstance(item, int | float):
+ self.storage.append(item)
+ elif isinstance(data, int | float):
+ self.storage.append(data)
+ else:
+ raise ValueError()
+ except ValueError:
+ print("Improper numeric data", file=sys.stderr)
+
+
+def is_number(s: str) -> bool:
+ try:
+ float(s)
+ return True
+ except (ValueError, TypeError):
+ return False
+
+
+class TextProcessor(DataProcessor):
+
+ def validate(self, data: str | typing.List[str]) -> bool:
+ try:
+ if isinstance(data, str):
+ if is_number(data):
+ raise ValueError()
+ else:
+ self.counter += 1
+ return True
+ elif isinstance(data, typing.List):
+ for item in data:
+ if isinstance(item, str):
+ if is_number(item):
+ raise ValueError()
+ return False
+ else:
+ self.counter += 1
+ if isinstance(item, dict):
+ raise ValueError()
+ return False
+ return True
+ else:
+ raise ValueError()
+ except ValueError:
+ return False
+
+ def ingest(self, data: str | typing.List[str]) -> None:
+ try:
+ if isinstance(data, str):
+ self.storage.append(data)
+ elif isinstance(data, typing.List):
+ for item in data:
+ if is_number(item):
+ raise ValueError()
+ elif isinstance(item, dict):
+ raise ValueError()
+ else:
+ self.storage.append(item)
+ else:
+ raise ValueError()
+ except ValueError:
+ print("Improper text data", file=sys.stderr)
+
+
+class LogProcessor(DataProcessor):
+
+ def validate(self, data: dict | typing.List[dict]) -> bool:
+ try:
+ if isinstance(data, dict):
+ return True
+ if isinstance(data, typing.List):
+ for item in data:
+ if isinstance(item, dict):
+ self.counter += 1
+ else:
+ raise ValueError()
+ return False
+ return True
+ else:
+ raise ValueError()
+ except ValueError:
+ return False
+
+ def ingest(self, data: dict | typing.List[dict]) -> None:
+ try:
+ if isinstance(data, dict):
+ for item in data:
+ self.storage.append(item)
+ elif isinstance(data, typing.List):
+ for item in data:
+ if isinstance(item, dict):
+ self.storage.append(item)
+ else:
+ raise ValueError()
+ except ValueError:
+ print("Improper log data", file=sys.stderr)
+
+
+class DataStream():
+ def __init__(self) -> None:
+ self.processor: list = []
+ self.counter: int = 0
+
+ def register_processor(self, proc: DataProcessor) -> None:
+ # method that allows you to register a new data processor to process
+ # the data stream.
+ self.processor.append(proc)
+
+ def process_stream(self, stream: list[typing.Any]) -> None:
+ # method that will analyze each element of the list received as a
+ # parameter and send it to the appropriate registered data processor.
+ # Error messages will be printed if no data processor can handle an
+ # element
+ for item in stream:
+ try:
+ for processor in self.processor:
+ # route towards appropriate processors
+ if processor.validate(item):
+ processor.ingest(item)
+ # stop when appropriate processors is found
+ break
+ else:
+ raise ValueError()
+ except ValueError:
+ print(f"Data Stream Error - Can't process "
+ f"element in stream: {item}", file=sys.stderr)
+
+ def print_processors_stats(self) -> None:
+ if self.processor == []:
+ print("No processor found, no data")
+ else:
+ for processor in self.processor:
+ remaining = len(processor.storage)
+ print(f"{processor.name}: total {processor.counter} items "
+ f"processed, remaining {remaining} on processor")
+
+ def output_pipeline(self, nb: int, plugin: ExportPlugin) -> None:
+ # method, to be used after calling process_stream, that will consume nb
+ # elements from all registered data processors and export them using
+ # the provided compatible plugin.
+ for processor in self.processor:
+ tmp = []
+ for _ in range(nb):
+ # to make check whether there is something to output
+ if processor.storage:
+ tmp.append(processor.output())
+ plugin.process_output(tmp)
+
+
+def main() -> None:
+
+ # Create a test scenario that demonstrates the correct processing of a data
+ # stream. Display statistics on registered data processors, consume
+ # elements using the output method of each data processor and show updated
+ # statistics
+
+ print("=== Code Nexus - Data Pipeline ===")
+ print()
+ print("Initialize Data Stream...")
+ print()
+ processors = DataStream()
+ print("=== DataStream statistic ===")
+ processors.print_processors_stats()
+ print()
+ # initialize processors
+ is_num = NumericProcessor("Numeric Processor")
+ is_text = TextProcessor("Text Processor")
+ is_log = LogProcessor("Log Processor")
+ print("Registering Processors")
+ print()
+ processors.register_processor(is_num)
+ processors.register_processor(is_text)
+ processors.register_processor(is_log)
+ batch_one = ['Hello world',
+ [3.14, -1, 2.71],
+ [{
+ 'log_level': 'WARNING',
+ ' log_message': 'Telnet access! Use ssh instead'
+ },
+ {
+ 'log_level': 'INFO',
+ 'log_message': 'User wil is connected'
+ }],
+ 42,
+ ['Hi', 'five']]
+ print(f"Sending first batch of data on stream: {batch_one}")
+ processors.process_stream(batch_one)
+ print()
+ print("=== DataStream statistic ===")
+ processors.print_processors_stats()
+ print()
+ print("Send 3 processed data from each processor to a CSV plugin:")
+ PluginCSV = ExportCSV()
+ processors.output_pipeline(3, PluginCSV)
+ print()
+ print("=== DataStream statistic ===")
+ processors.print_processors_stats()
+ print()
+ batch_two = [21,
+ ['I love AI', 'LLMs are wonderful', 'Stay healthy'],
+ [{'log_level': ' ERROR', 'log_message': '500 server crash'},
+ {'log_level': 'NOTICE',
+ 'log_message': 'Certificate expires in 10 days'}],
+ [32, 42, 64, 84, 128, 168],
+ 'World hello']
+ print(f"Send another batch of data : {batch_two}")
+ processors.process_stream(batch_two)
+ print()
+ print("=== DataStream statistic ===")
+ processors.print_processors_stats()
+ print()
+ print("Send 5 processed data from each processor to a JSON plugin:")
+ PluginJSON = ExportJSON()
+ processors.output_pipeline(5, PluginJSON)
+ print()
+ print("=== DataStream statistic ===")
+ processors.print_processors_stats()
+
+
+if __name__ == "__main__":
+ main()