Примеры пользовательских реализаций в Python SDKBETANEW
В этой статье
Примеры реализаций Python-блоков
Блок агрегации
Блок суммирует значения колонки value
в разрезе колонки name
. Аналогичен поведению groupby
в SQL.
from src.blocks.src.abstract_block import AbstractBlock
import pandas as pd
class AggregateBlock(AbstractBlock):
def __init__(self):
info_output = {
"uuid" : "aggr_block_4152eecc-fa15-4102-911d-f76044ac5445",
"type" : "action",
"name" : {
"en" : "Testing block info",
"ru" : "Блок агрегации",
},
"description" : {
"en" : "Testing block description",
"ru" : "Режим агрегации предназначен для вычисления суммы значений sum(`value`) в разрезе имен (ключ `name`)",
},
"compatible_connections" : [],
"optionals" : {
"is_save_data_on_fail_block_type" : False,
"is_system_block_type" : False
},
"fields" : """[{
key: 'name',
type: 'text',
label: 'Name',
description: "Поле name",
required: true
}, {
key: 'value',
type: 'text',
label: 'Value',
description: "Поле value",
required: true
}]"""
}
block_input = {
"name":'String',
"value":'Long'
}
block_output = {
"default": [
{
"name": "name",
"type": "String"
},
{
"name": "aggr_sum",
"type": "Long"
},
],
}
super().__init__(block_input, block_output, info_output)
def set_block_output(self, data_element:list):
self._set_block_output('default')
def set_block_aggr_mode(self, data_element:list):
self._set_block_aggr_mode(True)
def process_data(self, **data):
block_data = data['block_data']
df = pd.DataFrame(block_data)
return df.groupby('name').sum().reset_index().to_numpy().tolist()
Блок изменения поведения
Блок выполняет операцию суммирование/вычитание над входными данными при изменении положения переключателя в UI (слайдер sw_1).
from src.blocks.src.abstract_block import AbstractBlock
class ChangeBehaviourBlock(AbstractBlock):
def __init__(self):
info_output = {
"uuid" : "change_beh_block_aba8682c-1040-413b-beb3-12f878e224aa",
"type" : "action",
"name" : {
"en" : "Testing block info",
"ru" : "Блок изменения поведения",
},
"description" : {
"en" : "Testing block description",
"ru" : "Этот блок изменяет структуру выходных данных в зависимости от значения поля `switcher`",
},
"compatible_connections" : [],
"optionals" : {
"is_save_data_on_fail_block_type" : False,
"is_system_block_type" : False
},
"fields" : """[{
key: 'num_1',
type: 'numberPlain',
label: 'Введи 1 число',
description: "Поле для ввода первого числа",
required: true
}, {
key: 'num_2',
type: 'numberPlain',
label: 'Введи 2 число',
description: "Поле для ввода второго числа",
required: true
}, {
key: 'sw_1',
type: 'switcher',
label: 'True - сумма, False - разность',
required: true
}]"""
}
block_input = {
"num_1":'Long',
"num_2":'Long',
"sw_1":'Boolean',
}
block_output = {
"addition": [
{
"name": "addition",
"type": "Long"
},
],
"subtract":[
{
"name": "subtract",
"type": "Long"
},
]
}
super().__init__(block_input, block_output, info_output)
def set_block_output(self, data_element:dict):
if data_element['sw_1'] == True: # Пример изменения поведения блока
self._set_block_output('addition')
else:
self._set_block_output('subtract')
def set_block_aggr_mode(self, data_element:list):
self._set_block_aggr_mode(False)
def process_data(self, **data):
block_data = data['block_data']
result = []
for row in block_data:
if row['sw_1'] == True:
result.append([
[
row['num_1']+row['num_2']
]
])
else:
result.append([
[
row['num_1']-row['num_2']
]
])
return result
Блок вывода типов входных полей
Блок выводит типы входных полей в соответствии с типами данных Python.
from src.blocks.src.abstract_block import AbstractBlock
import pandas as pd
class InputTypesEchoBlock(AbstractBlock):
def __init__(self):
info_output = {
"uuid" : "echo_types_block_2a79bf18-0fa6-4ccd-ac88-af89881937ef",
"type" : "action",
"name" : {
"en" : "Testing block info",
"ru" : "Блок возврата типов",
},
"description" : {
"en" : "Testing block description",
"ru" : "Вернуть типы Python входных полей",
},
"compatible_connections" : [],
"optionals" : {
"is_save_data_on_fail_block_type" : False,
"is_system_block_type" : False
},
"fields" : """[{
key: 'str_1',
type: 'text',
label: 'text',
description: "",
required: true
}, {
key: 'str_2',
type: 'textPlain',
label: 'textPlain',
description: "",
required: true
}, {
key: 'str_3',
type: 'textArea',
label: 'textArea',
description: "",
required: true
}, {
key: 'str_4',
type: 'jsArea',
label: 'jsArea',
description: "",
required: true
}, {
key: 'int_1',
type: 'numberPlain',
label: 'int_1',
description: "",
required: true
}, {
key: 'float_1',
type: 'text',
label: 'float_1',
description: "",
required: true
}, {
key: 'sw_1',
type: 'switcher',
label: 'sw_1',
description: "bool",
required: true
}, {
key: 'datetime_1',
type: 'text',
label: 'datetime_1',
description: "ISO дата",
required: true
}]"""
}
block_input = {
"str_1":'String',
"str_2":'String',
"str_3":'String',
"str_4":'String',
"int_1":'Long',
"float_1":'Double',
"sw_1":'Boolean',
"datetime_1":'DateTime',
}
block_output = {
"str": [
{
"name": "info_types",
"type": "String"
}
],
"multiple":[
{
"name": "info_types",
"type": "ObjectArray",
"struct": [
{
"name": "key",
"type": "String",
},
{
"name": "value",
"type": "String",
},
{
"name": "type",
"type": "String",
}
]
}
]
}
super().__init__(block_input, block_output, info_output)
self.custom_user_selector = 'multiple'
def set_block_output(self, data_element:list):
self._set_block_output(self.custom_user_selector)
def set_block_aggr_mode(self, data_element:list):
self._set_block_aggr_mode(False)
def process_data(self, **data):
block_data = data['block_data'][0]
if self.custom_user_selector == 'str':
# Variant 1 - single str output
result = []
for key in block_data:
result.append(f'Key: {key}, Value: {block_data[key]}, Python type: {type(block_data[key]).__name__}')
return [
[
[
str(result)
],
]
]
elif self.custom_user_selector == 'multiple':
# Variant 2 - multiple outputs
result = []
for key in block_data:
result.append(
{
"key":str(key),
"value":str(block_data[key]),
"type":str(type(block_data[key]).__name__)
}
)
return [[result]]
else:
raise Exception('Unknown custom user selector choice!')
Блок обучения модели
Блок содержит 2 режима:
- Обучение модели для предсказания значений y в соответствии со значениями X
- Формирование предсказаний значений y в зависимости от значений X
Режимы переключаются с помощью слайдера sw_1. Модель сохраняется в папку, указанную в переданном подключении.
from src.blocks.src.abstract_block import AbstractBlock
from src.connections.vault_connection import VaultConnection
from sklearn.linear_model import LinearRegression
import joblib
import pandas as pd
import numpy as np
import os
import pathlib
class DefaultBlock(AbstractBlock):
def __init__(self):
info_output = {
"uuid" : "train_block_0cb4b748-6584-4ef2-a849-b56eb620f891",
"type" : "action",
"name" : {
"en" : "Train model block",
"ru" : "Блок обучения модели",
},
"description" : {
"en" : "Train model block description",
"ru" : "Описание блока обучения модели",
},
"compatible_connections" : [VaultConnection().uuid],
"optionals" : {
"is_save_data_on_fail_block_type" : False,
"is_system_block_type" : False
},
"fields" : """[{
key: 'X',
type: 'text',
label: 'Фича X',
description: "Поле для ввода первого числа",
required: true
}, {
key: 'switch_mode',
type: 'switcher',
label: 'True - обучение модели, False - инференс',
required: true
}, (z, bundle) => {
if (bundle.inputData.switch_mode) {
return [{
key: 'y',
type: 'text',
label: 'Таргет y',
required: false
}];
} else {
return [];
}
}]"""
}
block_input = {
"X":'Long',
"y":'Long',
"switch_mode":'Boolean',
}
block_output = {
"train": [
{
"name": "score",
"type": "Double"
}
],
"eval":[
{
"name": "eval_y",
"type": "Long"
},
]
}
super().__init__(block_input, block_output, info_output)
def set_block_output(self, data_element:list):
if data_element['switch_mode']:
self._set_block_output('train')
else:
self._set_block_output('eval')
def set_block_aggr_mode(self, data_element:list):
if data_element['switch_mode']:
self._set_block_aggr_mode(True)
else:
self._set_block_aggr_mode(False)
def process_data(self, **data):
connection_data = data['connection_data']
block_data = data['block_data']
vault_path = data['vault_path']
if block_data[0]['switch_mode']:
# Train
df = pd.DataFrame(block_data)
model = LinearRegression().fit(
df['X'].to_numpy().reshape(-1,1),
df['y'].to_numpy().reshape(-1,1)
)
# Check folder
pathlib.Path(os.path.join(vault_path, connection_data['folder_name'])).mkdir(parents=True, exist_ok=True)
joblib.dump(model, os.path.join(vault_path, connection_data['folder_name'], 'model.pkl'))
return [
[
model.score(
df['X'].to_numpy().reshape(-1,1),
df['y'].to_numpy().reshape(-1,1)
)
]
]
else:
model = joblib.load(
os.path.join(
vault_path,
connection_data['folder_name'],
'model.pkl'
)
)
result = model.predict(pd.DataFrame(block_data)['X'].to_numpy().reshape(-1,1)).tolist()
# Eval
return [[[round(r[0])]] for r in result]
Примеры реализаций Python-подключений
Подключение-хранилище
Блок содержит имя директории в хранилище. Может быть использовано для хранения мета-информации между вызовами блока. Например, в блоке обучения модели в директории хранится обучаемая модель.
from src.connections.src.abstract_connection import AbstractConnection
class VaultConnection(AbstractConnection):
def __init__(self):
uuid = "vault_connection_0dfa4e9a-b697-4812-9aa5-4497769fbf0c"
name = {
"en" : "Vault connection info",
"ru" : "Подключение хранилища",
}
description = {
"en" : "Testing connection description",
"ru" : "Описание тестового подключения",
}
optionals = {
"is_category_1" : False,
"option_2" : "category_2"
}
fields = """[{
"key" : "folder_name",
"type" : "textPlain",
"label" : "Имя папки",
"description" : "Имя папки для хранения модели",
"is_required" : true,
"optionals" : {
"visible" : true
}
}]"""
connection_input = {
"folder_name":'String'
}
super().__init__(uuid, name, description, optionals, fields, connection_input)
Была ли статья полезна?
Да
Нет