
Pipeline
Bronislav Klučka, 22. 8. 2025 16:43
V minulém článku jsem se věnoval rules engine patternu, patternu, jehož motivací je separation of concerns, high cohesion and low coupling. Dneska v tom budeme pokračovat dalším patternem, který podporuje tyto principy: pipeline
Začněme 2 ukázkami.
Příklad 1
Úkolem je otevřít CSV soubor s objednávkami, vyfiltrovat objednávky pro konkrétní produkty a poslat tyto objednávky pomocí API na službu, která zpracovává objednávky.
Pseudo-implementace
function uploadOrders() {
const file = fileSystem.open('./file.csv');
while (const row = fileSystem.readCSV(file)) {
if (row[3] === 'item1') {
const customer = customerService.get(row[0]);
const price = priceService.get(row[2]);
const data = {
name: customer.lastName + ' ' + customer.firstName,
count: row[1],
unitPrice: price.value,
id: row[3],
}
net.post('https://company.com/api/order', { body: data });
}
}
fileSystem.close(file);
}
Kód vypadá v pohodě, je rozumně krátký, ale zkuste si představit, že bude funkcionalita narůstat: co když může být více zdrojů, nejen CSV; co když může být více podmínek, co když formát API bude složitější. A v určitou chvíli, se daný kód může stát nečitelným.
Příklad 2
Úkolem bude autentifikovat a autorizovat request na API.
Pseudo-implementace
function authRequest() {
const headers = request.headersAsMap();
let user = null;
if (headers.authorization && headers.authorization.startsWith('Bearer ')) {
user = authModule.getUserByAuthzToken(headers.authorization); // returns User | null
}
if (user === null) throw Error('...')
const allowedRoutes = routesModule.getValidRoutesForUser(user);
if (!allowedRoutes[request.method] || !allowedRoutes[request.method].includes(request.path)) {
throw Error('...')
}
let data = url.parseQuery(request.queryParams) ?? {};
data = merge(data, JSON.parse(request.body));
if (!validateApiRequest(request.method, request.path, data)) {
throw Error('...')
}
return {
user,
data
}
}
Co když budeme mít více autentikačních metod, nebo budou složitější? Co když upravíme logiku autorizace na komplexní systém práv? Už i tato ukázka je na hraně...
Pipeline
Pipeline je pattern, který rozdělí sekvenci procesů do jednotlivých kroků a jejich orchestraci. Místo jednoho velkého procesu je pipeline pattern složen z několika nezávislých subprocesů, které o sobě nevědí a je na nadřazeném procesu, orchestrátoru, aby je provedl v požadovaném pořadí.
Základní vlastnosti:
- jedná se sekvenční zpracování dat
- záleží na pořadí kroků
Příklad 1
interface Order {
customerId: string;
priceId: string;
count: number;
itemId: string;
}
interface ApiOrder {
name: string;
price: number;
count: number;
id: string;
}
function getOrders(): Order[] {
const file = fileSystem.open('./file.csv');
const result: Order[] = [];
while (const row = fileSystem.readCSV(file)) {
result.push({
customerId: row[0];
count: row[1];
priceId: row[2];
itemId: row[3];
})
}
fileSystem.close(file);
return result;
}
function filterOrder(order: Order): boolean {
return order.itemId === 'item1';
}
function transformOrder(order: Order): ApiOrder {
const customer = customerService.get(order.customerId);
const price = priceService.get(order.priceId);
return {
name: customer.lastName + ' ' + customer.firstName,
count: order.count,
unitPrice: price.value,
id: order.itemId,
}
}
function sendOrder(order: ApiOrder): boolean {
return net.post('https://company.com/api/order', { body: order });
}
// now, let's have fun
function processAllOrders1() {
getOrders()
.filter(filterOrder)
.map(transformOrder)
.map(sendOrder);
}
/**
* but what if getOrders() would return million records? how about memory consumption?
* generator to the rescue
*/
function* getOrders2(): Generator<Order[]> {
const file = fileSystem.open('./file.csv');
while (const row = fileSystem.readCSV(file)) {
yield {
customerId: row[0];
count: row[1];
priceId: row[2];
itemId: row[3];
}
}
fileSystem.close(file);
}
function processAllOrders2() {
// build iterator
const iter = getOrders2()
.filter(filterOrder)
.map(transformOrder)
.map(sendOrder)
// run it
let result = iterator.next();
while (!result.done) {
result = iterator.next();
}
}
Najednou, pokud budeme mít jiný zdroj objednávek, jediné, co upravíme je zdroj objednávek bez dopadu na vše ostatní, apod.
Příklad 2
interface Context {
user: User | null;
routeAuthorized: boolean;
data: object | null;
}
function authnUser(request: Request, context: Context): void {
const headers = request.headersAsMap();
if (headers.authorization && headers.authorization.startsWith('Bearer ')) {
context.user = authModule.getUserByAuthzToken(headers.authorization); // returns User | null
}
}
function authzUser(request: Request, context: Context): void {
if (context.user !== null) {
const allowedRoutes = routesModule.getValidRoutesForUser(context.user);
context.routeAuthorized = allowedRoutes[request.method] && allowedRoutes[request.method].includes(request.path)
}
}
function requestData(request: Request, context: Context): void {
if (context.routeAuthorized) {
let data = url.parseQuery(request.queryParams) ?? {};
data = merge(data, JSON.parse(request.body));
if (validateApiRequest(request.method, request.path, data)) {
context.data = data
}
}
}
function authRequest(): Context {
const result = {
user: null;
routeAuthorized: false;
data: null;
}
authnUser(request, result);
if (result.user === null) throw Error('...');
authzUser(request, result);
if (result.routeAuthorized === false) throw Error('...');
requestData(request, result);
return result;
}
Jasné rozdělení odpovědností, jednoduchá rozšiřitelnost.
Na co všechno můžete pipeline pattern použít?
- ETL / SFT procesy
- aplikace filtrů/transformací na data, např. obrázky
- kompilátor - tokenizace, parsování, kompilace, assembly, linking
- interpreter - tokenizace, parsování, exekuce
- staging - komplexní proces v několika stages v několika modulech (např. zpracování objednávky - platba, sklad, odeslání, notifikace)
- CI/CD pipeline
A mnoho dalších.