mirror of
https://github.com/bec-project/bec_atlas.git
synced 2025-07-13 22:51:49 +02:00
fix: connect to selected deployment; reconnect when necessary
This commit is contained in:
@ -4,23 +4,31 @@ import { Observable } from 'rxjs';
|
|||||||
import { MessageEndpoints, EndpointInfo } from './redis_endpoints';
|
import { MessageEndpoints, EndpointInfo } from './redis_endpoints';
|
||||||
import { AppConfigService } from '../app-config.service';
|
import { AppConfigService } from '../app-config.service';
|
||||||
import { ServerSettingsService } from '../server-settings.service';
|
import { ServerSettingsService } from '../server-settings.service';
|
||||||
|
import { DeploymentService } from '../deployment.service';
|
||||||
|
|
||||||
@Injectable({
|
@Injectable()
|
||||||
providedIn: 'root',
|
|
||||||
})
|
|
||||||
export class RedisConnectorService {
|
export class RedisConnectorService {
|
||||||
private socket!: Socket;
|
private socket: Socket | null = null;
|
||||||
private signals: Map<string, WritableSignal<any>> = new Map();
|
private signals: Map<string, WritableSignal<any>> = new Map();
|
||||||
private signalReferenceCount: Map<string, number> = new Map();
|
private signalReferenceCount: Map<string, number> = new Map();
|
||||||
|
|
||||||
constructor(private serverSettings: ServerSettingsService) {
|
constructor(
|
||||||
this.connect();
|
private serverSettings: ServerSettingsService,
|
||||||
|
private deploymentService: DeploymentService
|
||||||
|
) {
|
||||||
|
this.deploymentService.selectedDeployment.subscribe((deployment) => {
|
||||||
|
this.disconnect();
|
||||||
|
if (!deployment) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
this.connect(deployment._id);
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Connect to the WebSocket server using socket.io
|
* Connect to the WebSocket server using socket.io
|
||||||
*/
|
*/
|
||||||
private connect(): void {
|
private connect(id: string): void {
|
||||||
this.socket = io(this.serverSettings.getSocketAddress(), {
|
this.socket = io(this.serverSettings.getSocketAddress(), {
|
||||||
transports: ['websocket'], // Use WebSocket only
|
transports: ['websocket'], // Use WebSocket only
|
||||||
autoConnect: true, // Automatically connect
|
autoConnect: true, // Automatically connect
|
||||||
@ -30,7 +38,7 @@ export class RedisConnectorService {
|
|||||||
auth: {
|
auth: {
|
||||||
user: 'john_doe',
|
user: 'john_doe',
|
||||||
token: '1234',
|
token: '1234',
|
||||||
deployment: '678aa8d4875568640bd92176',
|
deployment: id,
|
||||||
},
|
},
|
||||||
});
|
});
|
||||||
|
|
||||||
@ -79,6 +87,10 @@ export class RedisConnectorService {
|
|||||||
* @param data Data to send
|
* @param data Data to send
|
||||||
*/
|
*/
|
||||||
public emit(event: string, data: any): void {
|
public emit(event: string, data: any): void {
|
||||||
|
if (!this.socket) {
|
||||||
|
console.error('Socket not connected');
|
||||||
|
return;
|
||||||
|
}
|
||||||
this.socket.emit(event, data);
|
this.socket.emit(event, data);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -122,12 +134,19 @@ export class RedisConnectorService {
|
|||||||
*/
|
*/
|
||||||
public on<T>(event: string): Observable<T> {
|
public on<T>(event: string): Observable<T> {
|
||||||
return new Observable<T>((observer) => {
|
return new Observable<T>((observer) => {
|
||||||
|
if (!this.socket) {
|
||||||
|
console.error('Socket not connected');
|
||||||
|
return;
|
||||||
|
}
|
||||||
this.socket.on(event, (data: T) => {
|
this.socket.on(event, (data: T) => {
|
||||||
observer.next(data);
|
observer.next(data);
|
||||||
});
|
});
|
||||||
|
|
||||||
// Cleanup when unsubscribed
|
// Cleanup when unsubscribed
|
||||||
return () => {
|
return () => {
|
||||||
|
if (!this.socket) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
this.socket.off(event);
|
this.socket.off(event);
|
||||||
};
|
};
|
||||||
});
|
});
|
||||||
|
Reference in New Issue
Block a user