Pipeline

Pipeline

architekturanávrhové vzory

Headshot Bronislav Klučka, 22. 8. 2025 16:43

V minulém článku jsem se věnoval rules engine patternu, patternu, jehož motivací je separation of concerns, high cohesion and low coupling. Dneska v tom budeme pokračovat dalším patternem, který podporuje tyto principy: pipeline

Začněme 2 ukázkami.

Příklad 1

Úkolem je otevřít CSV soubor s objednávkami, vyfiltrovat objednávky pro konkrétní produkty a poslat tyto objednávky pomocí API na službu, která zpracovává objednávky.

Pseudo-implementace

function uploadOrders() {
	const file = fileSystem.open('./file.csv');
	while (const row = fileSystem.readCSV(file)) {
		if (row[3] === 'item1') {
			const customer = customerService.get(row[0]);
			const price = priceService.get(row[2]);
			const data = {
				name: customer.lastName + ' ' + customer.firstName,
				count: row[1],
				unitPrice: price.value,
				id: row[3],
			}
			net.post('https://company.com/api/order', { body: data });
		}
	}
	fileSystem.close(file);
}

Kód vypadá v pohodě, je rozumně krátký, ale zkuste si představit, že bude funkcionalita narůstat: co když může být více zdrojů, nejen CSV; co když může být více podmínek, co když formát API bude složitější. A v určitou chvíli, se daný kód může stát nečitelným.

Příklad 2

Úkolem bude autentifikovat a autorizovat request na API.

Pseudo-implementace

function authRequest() {
	const headers = request.headersAsMap();
	let user = null;
	if (headers.authorization && headers.authorization.startsWith('Bearer ')) {
		user = authModule.getUserByAuthzToken(headers.authorization); // returns User | null
	}
	if (user === null) throw Error('...')

	const allowedRoutes = routesModule.getValidRoutesForUser(user);
	if (!allowedRoutes[request.method] || !allowedRoutes[request.method].includes(request.path)) {
		throw Error('...')
	}

	let data = url.parseQuery(request.queryParams) ?? {};
	data = merge(data, JSON.parse(request.body));
	if (!validateApiRequest(request.method, request.path, data)) {
		throw Error('...')
	}

	return {
		user,
		data
	}
}

Co když budeme mít více autentikačních metod, nebo budou složitější? Co když upravíme logiku autorizace na komplexní systém práv? Už i tato ukázka je na hraně...

Pipeline

pipeline

Pipeline je pattern, který rozdělí sekvenci procesů do jednotlivých kroků a jejich orchestraci. Místo jednoho velkého procesu je pipeline pattern složen z několika nezávislých subprocesů, které o sobě nevědí a je na nadřazeném procesu, orchestrátoru, aby je provedl v požadovaném pořadí.

Základní vlastnosti:

  • jedná se sekvenční zpracování dat
  • záleží na pořadí kroků

Příklad 1

interface Order {
	customerId: string;
	priceId: string;
	count: number;
	itemId: string;
}

interface ApiOrder {
	name: string;
	price: number;
	count: number;
	id: string;
}

function getOrders(): Order[] {
	const file = fileSystem.open('./file.csv');
	const result: Order[] = [];

	while (const row = fileSystem.readCSV(file)) {
		result.push({
			customerId: row[0];
			count: row[1];
			priceId: row[2];
			itemId: row[3];
		})
	}
	fileSystem.close(file);
	return result;
}

function filterOrder(order: Order): boolean {
	return order.itemId === 'item1';
}

function transformOrder(order: Order): ApiOrder {
	const customer = customerService.get(order.customerId);
	const price = priceService.get(order.priceId);
	return {
		name: customer.lastName + ' ' + customer.firstName,
		count: order.count,
		unitPrice: price.value,
		id: order.itemId,
	}
}

function sendOrder(order: ApiOrder): boolean {
	return net.post('https://company.com/api/order', { body: order });
}

// now, let's have fun

function processAllOrders1() {
	getOrders()
		.filter(filterOrder)
		.map(transformOrder)
		.map(sendOrder);
}

/**
* but what if getOrders() would return million records? how about memory consumption?
* generator to the rescue
*/

function* getOrders2(): Generator<Order[]>  {
	const file = fileSystem.open('./file.csv');

	while (const row = fileSystem.readCSV(file)) {
		yield {
			customerId: row[0];
			count: row[1];
			priceId: row[2];
			itemId: row[3];
		}
	}
	fileSystem.close(file);
}

function processAllOrders2() {

	// build iterator
	const iter = getOrders2()
		.filter(filterOrder)
		.map(transformOrder)
		.map(sendOrder)

	// run it
	let result = iterator.next();
	while (!result.done) {
		result = iterator.next();
	}
}

Najednou, pokud budeme mít jiný zdroj objednávek, jediné, co upravíme je zdroj objednávek bez dopadu na vše ostatní, apod.

Příklad 2

interface Context {
	user: User | null;
	routeAuthorized: boolean;
	data: object | null;
}

function authnUser(request: Request, context: Context): void {
	const headers = request.headersAsMap();
	if (headers.authorization && headers.authorization.startsWith('Bearer ')) {
		context.user = authModule.getUserByAuthzToken(headers.authorization); // returns User | null
	}
}

function authzUser(request: Request, context: Context): void {
	if (context.user !== null) {
		const allowedRoutes = routesModule.getValidRoutesForUser(context.user);
		context.routeAuthorized = allowedRoutes[request.method] && allowedRoutes[request.method].includes(request.path)
	}
}

function requestData(request: Request, context: Context): void {
	if (context.routeAuthorized) {
		let data = url.parseQuery(request.queryParams) ?? {};
		data = merge(data, JSON.parse(request.body));
		if (validateApiRequest(request.method, request.path, data)) {
			context.data = data
		}
	}
}


function authRequest(): Context {
	const result = {
		user: null;
		routeAuthorized: false;
		data: null;
	}
	authnUser(request, result);
	if (result.user === null) throw Error('...');
	authzUser(request, result);
	if (result.routeAuthorized === false) throw Error('...');
	requestData(request, result);
	return result;
}

Jasné rozdělení odpovědností, jednoduchá rozšiřitelnost.

Na co všechno můžete pipeline pattern použít?

  • ETL / SFT procesy
  • aplikace filtrů/transformací na data, např. obrázky
  • kompilátor - tokenizace, parsování, kompilace, assembly, linking
  • interpreter - tokenizace, parsování, exekuce
  • staging - komplexní proces v několika stages v několika modulech (např. zpracování objednávky - platba, sklad, odeslání, notifikace)
  • CI/CD pipeline

A mnoho dalších.

Headshot Bronislav Klučka, 22. 8. 2025 16:43

Komentáře

Napište komentář

Komentáře jsou schvalované, než se zobrazí na stránkách.

Nebylo možné přidat komentář.

Váš komentář byl přidán a čeká se na schválení.

Váš komentář je příliš dlouhý.

Používáme cookies a podobné technologie, jako Google Analytics, pro sběr analytických dat. To nám pomáhá pochopit, jak uživatelé používají naše stránky.

Více info

Stránky používají Google Analytics, a analytické služby poskytované společností Google. Google Analytics používá cookies, aby nám tato služba pomohla analyzovat, jak uživatelé používají naše stránky. Informace generované cookies, které se týkají vašeho používání stránek (včetně vaší IP adresy) budou přeneseny a uloženy u společnosti Google. Požíváme tato data pro vytváření reportů o aktivitě a k poskytování dalších služeb, které se týkají těchto stránek.

Analytická data nám pomáhají vylepšovat naše služby. Nepoužíváme je k marketingovým a reklamním účelům. Tato data nepředáváme ani neprodáváme dál.