Introduction to RxJS in Angular

📘 Introduction to RxJS in Angular

RxJS (Reactive Extensions for JavaScript) is a powerful library that enables reactive programming using observable sequences. It’s a core part of Angular and is used extensively for managing asynchronous operations like HTTP calls, user inputs, WebSocket streams, and more.

❓ What is RxJS?

RxJS is based on the Observer pattern, where:

  • Observables emit data over time
  • Subscribers react to emitted values
It allows composing complex async logic with operators like map, filter, switchMap, and more.

✅ Why Use RxJS in Angular?

  • HttpClient – for making HTTP requests
  • ReactiveForms – for managing form controls
  • ActivatedRoute – for route parameters
  • NgRx – for state management

🔥 Benefits:

  • Efficient Data Streams
  • Event-Driven, Reactive Code
  • Better State Management
  • Powerful Operators
  • Easy Cancellation & Cleanup

🔍 Core Features of RxJS

1. Observables

Used to represent a data stream.


import { Observable } from 'rxjs';

const observable = new Observable(observer => {
  observer.next('Hello');
  observer.next('RxJS');
  observer.complete();
});

observable.subscribe(value => console.log(value));

2. Operators

Used to transform or filter the stream.


import { fromEvent } from 'rxjs';
import { debounceTime, map } from 'rxjs/operators';

const input = document.getElementById('searchBox') as HTMLInputElement;

const inputObservable = fromEvent(input, 'input').pipe(
  debounceTime(500),
  map(event => (event.target as HTMLInputElement).value)
);

inputObservable.subscribe(value => console.log('Search:', value));

3. Subjects

Subjects act as both an observable and an observer, enabling multicasting.


import { BehaviorSubject } from 'rxjs';

const subject = new BehaviorSubject('Initial Value');

subject.subscribe(val => console.log('Subscriber 1:', val));
subject.next('New Value');
subject.subscribe(val => console.log('Subscriber 2:', val));

4. Subscription Management in Angular

It's essential to unsubscribe from observables to avoid memory leaks.


import { Subscription, interval } from 'rxjs';

@Component({ /*...*/ })
export class DemoComponent implements OnInit, OnDestroy {
  private subscription!: Subscription;

  ngOnInit() {
    this.subscription = interval(1000).subscribe(val => {
      console.log('Tick:', val);
    });
  }

  ngOnDestroy() {
    this.subscription.unsubscribe();
    console.log('Unsubscribed on destroy');
  }
}

🛠️ Real Angular Example: Reactive Form with RxJS

📦 Country Service


@Injectable({ providedIn: 'root' })
export class CountryService {
  private apiUrl = 'https://yourapi.com/api';

  constructor(private http: HttpClient) {}

  getCountries(): Observable<Country[]> {
    return this.http.get<Country[]>(`${this.apiUrl}/countries`).pipe(
      catchError(error => {
        console.error('Error loading countries', error);
        return of([]);
      })
    );
  }

  getStatesByCountryId(countryId: number): Observable<State[]> {
    return this.http.get<State[]>(`${this.apiUrl}/states?countryId=${countryId}`).pipe(
      catchError(error => {
        console.error('Error loading states', error);
        return of([]);
      })
    );
  }
}

👨‍💼 EmployeeFormComponent


@Component({
  selector: 'app-employee-form',
  templateUrl: './employee-form.component.html'
})
export class EmployeeFormComponent implements OnInit, OnDestroy {
  form: FormGroup;
  countries: Country[] = [];
  states: State[] = [];
  loadingCountries = true;
  loadingStates = false;

  private subscriptions = new Subscription();

  constructor(
    private fb: FormBuilder,
    private countryService: CountryService
  ) {
    this.form = this.fb.group({
      name: [''],
      country: [''],
      state: ['']
    });
  }

  ngOnInit() {
    const countrySub = this.countryService.getCountries().subscribe(data => {
      this.countries = data;
      this.loadingCountries = false;
    });
    this.subscriptions.add(countrySub);

    const valueChangeSub = this.form.get('country')?.valueChanges.subscribe(countryId => {
      if (countryId) {
        this.loadingStates = true;
        this.countryService.getStatesByCountryId(countryId).subscribe(states => {
          this.states = states;
          this.loadingStates = false;
          this.form.get('state')?.setValue('');
        });
      } else {
        this.states = [];
        this.form.get('state')?.setValue('');
      }
    });

    this.subscriptions.add(valueChangeSub!);
  }

  onSubmit() {
    console.log('Form submitted:', this.form.value);
  }

  ngOnDestroy() {
    this.subscriptions.unsubscribe();
  }
}

📝 HTML Template (employee-form.component.html)


<form [formGroup]="form" (ngSubmit)="onSubmit()">
  <label>Name:</label>
  <input formControlName="name" type="text" class="form-control" />

  <label>Country:</label>
  <select formControlName="country" class="form-select">
    <option value="">Select Country</option>
    <option *ngFor="let country of countries" [value]="country.id">
      {{ country.name }}
    </option>
  </select>
  <span *ngIf="loadingCountries">Loading countries...</span>

  <label>State:</label>
  <select formControlName="state" [disabled]="!states.length" class="form-select">
    <option value="">Select State</option>
    <option *ngFor="let state of states" [value]="state.id">
      {{ state.name }}
    </option>
  </select>
  <span *ngIf="loadingStates">Loading states...</span>

  <button type="submit" class="btn btn-primary mt-3">Submit</button>
</form>

📊 RxJS Flow Diagram





🧠 Conclusion

RxJS is foundational in Angular for building reactive applications. Mastering observables, operators, and subscription lifecycles will significantly improve your ability to build scalable, performant apps.

Comments