Introduction to RxJS in Angular

📘 Introduction to RxJS in Angular

RxJS (Reactive Extensions for JavaScript) is a powerful library that enables reactive programming using observable sequences. It’s a core part of Angular and is used extensively for managing asynchronous operations like HTTP calls, user inputs, WebSocket streams, and more.

❓ What is RxJS?

RxJS is based on the Observer pattern, where:

  • Observables emit data over time
  • Subscribers react to emitted values
It allows composing complex async logic with operators like map, filter, switchMap, and more.

✅ Why Use RxJS in Angular?

  • HttpClient – for making HTTP requests
  • ReactiveForms – for managing form controls
  • ActivatedRoute – for route parameters
  • NgRx – for state management

🔥 Benefits:

  • Efficient Data Streams
  • Event-Driven, Reactive Code
  • Better State Management
  • Powerful Operators
  • Easy Cancellation & Cleanup

🔍 Core Features of RxJS

1. Observables

Used to represent a data stream.


import { Observable } from 'rxjs';

const observable = new Observable(observer => {
  observer.next('Hello');
  observer.next('RxJS');
  observer.complete();
});

observable.subscribe(value => console.log(value));

2. Operators

Used to transform or filter the stream.


import { fromEvent } from 'rxjs';
import { debounceTime, map } from 'rxjs/operators';

const input = document.getElementById('searchBox') as HTMLInputElement;

const inputObservable = fromEvent(input, 'input').pipe(
  debounceTime(500),
  map(event => (event.target as HTMLInputElement).value)
);

inputObservable.subscribe(value => console.log('Search:', value));

3. Subjects

Subjects act as both an observable and an observer, enabling multicasting.


import { BehaviorSubject } from 'rxjs';

const subject = new BehaviorSubject('Initial Value');

subject.subscribe(val => console.log('Subscriber 1:', val));
subject.next('New Value');
subject.subscribe(val => console.log('Subscriber 2:', val));

4. Subscription Management in Angular

It's essential to unsubscribe from observables to avoid memory leaks.


import { Subscription, interval } from 'rxjs';

@Component({ /*...*/ })
export class DemoComponent implements OnInit, OnDestroy {
  private subscription!: Subscription;

  ngOnInit() {
    this.subscription = interval(1000).subscribe(val => {
      console.log('Tick:', val);
    });
  }

  ngOnDestroy() {
    this.subscription.unsubscribe();
    console.log('Unsubscribed on destroy');
  }
}

🛠️ Real Angular Example: Reactive Form with RxJS

📦 Country Service


@Injectable({ providedIn: 'root' })
export class CountryService {
  private apiUrl = 'https://yourapi.com/api';

  constructor(private http: HttpClient) {}

  getCountries(): Observable<Country[]> {
    return this.http.get<Country[]>(`${this.apiUrl}/countries`).pipe(
      catchError(error => {
        console.error('Error loading countries', error);
        return of([]);
      })
    );
  }

  getStatesByCountryId(countryId: number): Observable<State[]> {
    return this.http.get<State[]>(`${this.apiUrl}/states?countryId=${countryId}`).pipe(
      catchError(error => {
        console.error('Error loading states', error);
        return of([]);
      })
    );
  }
}

👨‍💼 EmployeeFormComponent


@Component({
  selector: 'app-employee-form',
  templateUrl: './employee-form.component.html'
})
export class EmployeeFormComponent implements OnInit, OnDestroy {
  form: FormGroup;
  countries: Country[] = [];
  states: State[] = [];
  loadingCountries = true;
  loadingStates = false;

  private subscriptions = new Subscription();

  constructor(
    private fb: FormBuilder,
    private countryService: CountryService
  ) {
    this.form = this.fb.group({
      name: [''],
      country: [''],
      state: ['']
    });
  }

  ngOnInit() {
    const countrySub = this.countryService.getCountries().subscribe(data => {
      this.countries = data;
      this.loadingCountries = false;
    });
    this.subscriptions.add(countrySub);

    const valueChangeSub = this.form.get('country')?.valueChanges.subscribe(countryId => {
      if (countryId) {
        this.loadingStates = true;
        this.countryService.getStatesByCountryId(countryId).subscribe(states => {
          this.states = states;
          this.loadingStates = false;
          this.form.get('state')?.setValue('');
        });
      } else {
        this.states = [];
        this.form.get('state')?.setValue('');
      }
    });

    this.subscriptions.add(valueChangeSub!);
  }

  onSubmit() {
    console.log('Form submitted:', this.form.value);
  }

  ngOnDestroy() {
    this.subscriptions.unsubscribe();
  }
}

📝 HTML Template (employee-form.component.html)


<form [formGroup]="form" (ngSubmit)="onSubmit()">
  <label>Name:</label>
  <input formControlName="name" type="text" class="form-control" />

  <label>Country:</label>
  <select formControlName="country" class="form-select">
    <option value="">Select Country</option>
    <option *ngFor="let country of countries" [value]="country.id">
      {{ country.name }}
    </option>
  </select>
  <span *ngIf="loadingCountries">Loading countries...</span>

  <label>State:</label>
  <select formControlName="state" [disabled]="!states.length" class="form-select">
    <option value="">Select State</option>
    <option *ngFor="let state of states" [value]="state.id">
      {{ state.name }}
    </option>
  </select>
  <span *ngIf="loadingStates">Loading states...</span>

  <button type="submit" class="btn btn-primary mt-3">Submit</button>
</form>

📊 RxJS Flow Diagram





🧠 Conclusion

RxJS is foundational in Angular for building reactive applications. Mastering observables, operators, and subscription lifecycles will significantly improve your ability to build scalable, performant apps.

Comments

Popular posts from this blog

Debouncing & Throttling in RxJS: Optimizing API Calls and User Interactions

Promises in Angular

Comprehensive Guide to C# and .NET Core OOP Concepts and Language Features