1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
|
#!/usr/bin/env python3
#
# "Briefing: Your task is to integrate work from past exercices into a complete
# data processing pipeline that demonstrates understanding of polymorphic
# concepts. Use your code from Exercise 1 and improve it in order to obtain a
# complete data pipeline. Your DataStream class already handles input streams.
# You need now to handle the output part of the pipeline. This will be achieved
# by using a plugin system for export classes, made export-compatible through
# duck typing."
#
# https://typing.python.org/en/latest/spec/protocol.html
from abc import ABC, abstractmethod
import typing
import sys
class ExportPlugin(typing.Protocol):
def process_output(self, data: list[tuple[int, str]]) -> None:
# The type of the data parameter is a list of tuples that matches the
# return value of the output method from the DataProcessor class.
# output CSV or JSON
# if list output as CSV
...
class ExportCSV(ExportPlugin):
def process_output(self, data: list[tuple[int, str]]) -> None:
i = 0
print("CSV Output:")
for item in data:
print(data[i][1], end='')
if item is not data[-1]:
print(',', end='')
else:
print()
i += 1
class ExportJSON(ExportPlugin):
def process_output(self, data: list[tuple[int, str]]) -> None:
i = 0
print("JSON Output:")
print("{", end='')
for item in data:
print(f"\"item_{data[i][0]}\": \"{data[i][1]}\"", end='')
if item is not data[-1]:
print(', ', end='')
else:
print(end='')
i += 1
print("}")
class DataProcessor(ABC):
def __init__(self, name: str) -> None:
self.storage: typing.List[typing.Any] = []
self.rank: int = 0
self.counter: int = 0
self.name = name
@abstractmethod
def validate(self, data: typing.Any) -> bool:
pass
@abstractmethod
def ingest(self, data: typing.Any) -> None:
pass
def output(self) -> tuple[int, str]:
# extract the oldest piece of data
# piece of data is removed
position = self.rank
self.rank += 1
if isinstance(self.storage[0], dict):
entry = self.storage[0]
output_tuple: tuple = tuple(entry.values())
to_return = (f"{output_tuple[0]}: {output_tuple[1]}")
self.storage.pop(0)
return position, to_return
to_return = self.storage[0]
self.storage.pop(0)
return position, to_return
class NumericProcessor(DataProcessor):
def validate(self, data: int | float | typing.List[int | float]) -> bool:
try:
if isinstance(data, int):
self.counter += 1
return True
elif isinstance(data, float):
self.counter += 1
return True
elif isinstance(data, typing.List):
type_list = "yes"
for item in data:
self.counter += 1
if not isinstance(item, int | float):
type_list = "no"
self.counter -= 1
if type_list == "yes":
return True
else:
raise ValueError()
else:
raise ValueError()
except ValueError:
return False
def ingest(self, data: int | float | typing.List[int | float]) -> None:
try:
if isinstance(data, typing.List):
for item in data:
if isinstance(item, int | float):
self.storage.append(item)
elif isinstance(data, int | float):
self.storage.append(data)
else:
raise ValueError()
except ValueError:
print("Improper numeric data", file=sys.stderr)
def is_number(s: str) -> bool:
try:
float(s)
return True
except (ValueError, TypeError):
return False
class TextProcessor(DataProcessor):
def validate(self, data: str | typing.List[str]) -> bool:
try:
if isinstance(data, str):
if is_number(data):
raise ValueError()
else:
self.counter += 1
return True
elif isinstance(data, typing.List):
for item in data:
if isinstance(item, str):
if is_number(item):
raise ValueError()
return False
else:
self.counter += 1
if isinstance(item, dict):
raise ValueError()
return False
return True
else:
raise ValueError()
except ValueError:
return False
def ingest(self, data: str | typing.List[str]) -> None:
try:
if isinstance(data, str):
self.storage.append(data)
elif isinstance(data, typing.List):
for item in data:
if is_number(item):
raise ValueError()
elif isinstance(item, dict):
raise ValueError()
else:
self.storage.append(item)
else:
raise ValueError()
except ValueError:
print("Improper text data", file=sys.stderr)
class LogProcessor(DataProcessor):
def validate(self, data: dict | typing.List[dict]) -> bool:
try:
if isinstance(data, dict):
return True
if isinstance(data, typing.List):
for item in data:
if isinstance(item, dict):
self.counter += 1
else:
raise ValueError()
return False
return True
else:
raise ValueError()
except ValueError:
return False
def ingest(self, data: dict | typing.List[dict]) -> None:
try:
if isinstance(data, dict):
for item in data:
self.storage.append(item)
elif isinstance(data, typing.List):
for item in data:
if isinstance(item, dict):
self.storage.append(item)
else:
raise ValueError()
except ValueError:
print("Improper log data", file=sys.stderr)
class DataStream():
def __init__(self) -> None:
self.processor: list = []
self.counter: int = 0
def register_processor(self, proc: DataProcessor) -> None:
# method that allows you to register a new data processor to process
# the data stream.
self.processor.append(proc)
def process_stream(self, stream: list[typing.Any]) -> None:
# method that will analyze each element of the list received as a
# parameter and send it to the appropriate registered data processor.
# Error messages will be printed if no data processor can handle an
# element
for item in stream:
try:
for processor in self.processor:
# route towards appropriate processors
if processor.validate(item):
processor.ingest(item)
# stop when appropriate processors is found
break
else:
raise ValueError()
except ValueError:
print(f"Data Stream Error - Can't process "
f"element in stream: {item}", file=sys.stderr)
def print_processors_stats(self) -> None:
if self.processor == []:
print("No processor found, no data")
else:
for processor in self.processor:
remaining = len(processor.storage)
print(f"{processor.name}: total {processor.counter} items "
f"processed, remaining {remaining} on processor")
def output_pipeline(self, nb: int, plugin: ExportPlugin) -> None:
# method, to be used after calling process_stream, that will consume nb
# elements from all registered data processors and export them using
# the provided compatible plugin.
for processor in self.processor:
tmp = []
for _ in range(nb):
# to make check whether there is something to output
if processor.storage:
tmp.append(processor.output())
plugin.process_output(tmp)
def main() -> None:
# Create a test scenario that demonstrates the correct processing of a data
# stream. Display statistics on registered data processors, consume
# elements using the output method of each data processor and show updated
# statistics
print("=== Code Nexus - Data Pipeline ===")
print()
print("Initialize Data Stream...")
print()
processors = DataStream()
print("=== DataStream statistic ===")
processors.print_processors_stats()
print()
# initialize processors
is_num = NumericProcessor("Numeric Processor")
is_text = TextProcessor("Text Processor")
is_log = LogProcessor("Log Processor")
print("Registering Processors")
print()
processors.register_processor(is_num)
processors.register_processor(is_text)
processors.register_processor(is_log)
batch_one = ['Hello world',
[3.14, -1, 2.71],
[{
'log_level': 'WARNING',
' log_message': 'Telnet access! Use ssh instead'
},
{
'log_level': 'INFO',
'log_message': 'User wil is connected'
}],
42,
['Hi', 'five']]
print(f"Sending first batch of data on stream: {batch_one}")
processors.process_stream(batch_one)
print()
print("=== DataStream statistic ===")
processors.print_processors_stats()
print()
print("Send 3 processed data from each processor to a CSV plugin:")
PluginCSV = ExportCSV()
processors.output_pipeline(3, PluginCSV)
print()
print("=== DataStream statistic ===")
processors.print_processors_stats()
print()
batch_two = [21,
['I love AI', 'LLMs are wonderful', 'Stay healthy'],
[{'log_level': ' ERROR', 'log_message': '500 server crash'},
{'log_level': 'NOTICE',
'log_message': 'Certificate expires in 10 days'}],
[32, 42, 64, 84, 128, 168],
'World hello']
print(f"Send another batch of data : {batch_two}")
processors.process_stream(batch_two)
print()
print("=== DataStream statistic ===")
processors.print_processors_stats()
print()
print("Send 5 processed data from each processor to a JSON plugin:")
PluginJSON = ExportJSON()
processors.output_pipeline(5, PluginJSON)
print()
print("=== DataStream statistic ===")
processors.print_processors_stats()
if __name__ == "__main__":
main()
|