Средний 11 мин чтения
RxJS основы
RxJS — библиотека реактивного программирования на основе Observable. Angular использует RxJS для работы с HTTP, формами и асинхронными событиями.
RxJSObservableoperatorsAngularreactive
Что такое Observable?
Observable — поток данных, на который можно подписаться. В отличие от Promise, Observable может:
- Испускать несколько значений
- Быть отменён (через unsubscribe)
- Быть синхронным или асинхронным
import { Observable, of, from, interval } from 'rxjs'
// Создание Observable
const numbers$ = new Observable<number>(subscriber => {
subscriber.next(1)
subscriber.next(2)
subscriber.next(3)
subscriber.complete()
})
// Подписка
const sub = numbers$.subscribe({
next: (value) => console.log(value), // 1, 2, 3
error: (err) => console.error(err),
complete: () => console.log('Готово'),
})
// Отписка
sub.unsubscribe()
Вспомогательные функции создания
// of — из значений
of(1, 2, 3).subscribe(console.log) // 1, 2, 3
// from — из массива, Promise, итерабельного
from([1, 2, 3]).subscribe(console.log)
from(fetch('/api/data')).subscribe(console.log) // Promise → Observable
// interval — через интервал
interval(1000).subscribe(n => console.log(n)) // 0, 1, 2... каждую секунду
// timer — задержка + интервал
timer(2000, 1000).subscribe(n => console.log(n)) // Начало через 2с, потом каждую 1с
// fromEvent — DOM-события
fromEvent(document, 'click').subscribe(e => console.log(e))
Операторы
Операторы трансформируют потоки. Используются через pipe():
Трансформация
import { map, filter, tap, take } from 'rxjs/operators'
// map — преобразование значения
from([1, 2, 3]).pipe(
map(n => n * 2)
).subscribe(console.log) // 2, 4, 6
// filter — фильтрация
from([1, 2, 3, 4, 5]).pipe(
filter(n => n % 2 === 0)
).subscribe(console.log) // 2, 4
// tap — побочный эффект без изменения значения
getData().pipe(
tap(data => console.log('Получено:', data))
).subscribe(process)
// take — взять N значений
interval(1000).pipe(
take(5)
).subscribe(console.log) // 0, 1, 2, 3, 4 → complete
Работа с HTTP и вложенными Observable
import { switchMap, mergeMap, exhaustMap, concatMap } from 'rxjs/operators'
// switchMap — отменяет предыдущий Observable при новом значении
// Идеально для поиска
searchInput$.pipe(
debounceTime(300),
switchMap(query => this.api.search(query))
).subscribe(results => this.results = results)
// mergeMap — параллельное выполнение
// Для нескольких независимых запросов
ids$.pipe(
mergeMap(id => this.api.getItem(id))
).subscribe(item => this.items.push(item))
// concatMap — последовательное выполнение
// Для операций, требующих порядка
actions$.pipe(
concatMap(action => this.api.process(action))
).subscribe()
// exhaustMap — игнорирует новые, пока текущий не завершён
// Для кнопки отправки формы
submitClick$.pipe(
exhaustMap(() => this.api.submit(formData))
).subscribe()
Управление временем
import { debounceTime, throttleTime, delay, timeout } from 'rxjs/operators'
// debounceTime — ждать паузу N мс после последнего события
searchInput$.pipe(
debounceTime(400)
).subscribe(query => this.search(query))
// throttleTime — не чаще одного раза в N мс
scrollEvent$.pipe(
throttleTime(100)
).subscribe(this.handleScroll)
// timeout — ошибка если нет значения за N мс
api.getData().pipe(
timeout(5000)
).subscribe({
error: (err) => console.error('Таймаут!')
})
Обработка ошибок
import { catchError, retry, retryWhen } from 'rxjs/operators'
// catchError — перехватить и вернуть другой Observable
this.api.getUsers().pipe(
catchError(err => of([])) // При ошибке вернуть пустой массив
).subscribe()
// retry — повторить N раз при ошибке
this.api.getData().pipe(
retry(3)
).subscribe()
Subject
Subject — одновременно Observable и Observer. Позволяет императивно генерировать события:
import { Subject, BehaviorSubject } from 'rxjs'
// Subject — нет начального значения, только новые подписчики
const events$ = new Subject<string>()
events$.subscribe(e => console.log('Sub 1:', e))
events$.next('click') // Sub 1: click
// BehaviorSubject — хранит последнее значение
const user$ = new BehaviorSubject<User | null>(null)
user$.subscribe(u => console.log(u)) // null сразу при подписке
user$.next({ name: 'Иван' }) // { name: 'Иван' }
console.log(user$.getValue()) // Синхронное получение
Управление подписками в Angular
@Component({ template: `{{ user$ | async }}` })
export class UserComponent implements OnDestroy {
// Способ 1: async pipe (рекомендуется)
user$ = this.userService.getUser(1)
// Способ 2: takeUntilDestroyed (Angular 16+)
constructor() {
this.userService.getAll()
.pipe(takeUntilDestroyed())
.subscribe(users => this.users = users)
}
// Способ 3: ручная отписка
private sub = new Subscription()
ngOnInit() {
this.sub.add(
interval(1000).subscribe(n => this.time = n)
)
}
ngOnDestroy() {
this.sub.unsubscribe() // Предотвращает утечку памяти
}
}