From: jxnshi Date: Tue, 25 Feb 2025 13:46:26 +0000 (+0100) Subject: Working on sending messages X-Git-Url: https://jxnshi.xyz/repos?a=commitdiff_plain;h=2339fdbc035880d7f5c27e6caeba87218f3a4789;p=mesange.git Working on sending messages --- diff --git a/client-cli/client-cli b/client-cli/client-cli index b281996..1d90f42 100755 Binary files a/client-cli/client-cli and b/client-cli/client-cli differ diff --git a/common/request.odin b/common/request.odin index c74af9e..9825c63 100644 --- a/common/request.odin +++ b/common/request.odin @@ -33,6 +33,10 @@ Client_Request_From_Bytes_Error :: enum { Invalid_Public_Key, } +Server_Request_From_Bytes_Error :: enum { + Invalid_Kind, +} + Request_Verify_Error :: enum { Expired, Invalid_Signature, @@ -44,6 +48,7 @@ Error :: union { Receive_Message_Error, Client_Request_From_Bytes_Error, + Server_Request_From_Bytes_Error, Request_Verify_Error, @@ -53,6 +58,8 @@ Error :: union { Room_ID :: [sha3.DIGEST_SIZE_256]u8 +Sync_Request :: struct {} + Push_Request :: struct { room_id: Room_ID, message: [MESSAGE_SIZE]u8, @@ -134,6 +141,8 @@ unsubscribe_request_to_bytes :: proc(stream: io.Stream, request: Unsubscribe_Req } Client_Request_Inner :: union { + Sync_Request, + Push_Request, Subscribe_Request, @@ -146,10 +155,12 @@ client_request_inner_from_bytes :: proc(stream: io.Stream) -> (request: Client_R inner: Client_Request_Inner switch kind { - case 0: inner = push_request_from_bytes(stream) or_return + case 0: inner = Sync_Request{} - case 1: inner = subscribe_request_from_bytes(stream) or_return - case 2: inner = unsubscribe_request_from_bytes(stream) or_return + case 1: inner = push_request_from_bytes(stream) or_return + + case 2: inner = subscribe_request_from_bytes(stream) or_return + case 3: inner = unsubscribe_request_from_bytes(stream) or_return case: return {}, Client_Request_From_Bytes_Error.Invalid_Inner_Kind } @@ -159,16 +170,19 @@ client_request_inner_from_bytes :: proc(stream: io.Stream) -> (request: Client_R client_request_inner_to_bytes :: proc(stream: io.Stream, inner: Client_Request_Inner) -> Error { switch i in inner { - case Push_Request: + case Sync_Request: io.write_byte(stream, 0) or_return + + case Push_Request: + io.write_byte(stream, 1) or_return push_request_to_bytes(stream, i) or_return case Subscribe_Request: - io.write_byte(stream, 1) or_return + io.write_byte(stream, 2) or_return subscribe_request_to_bytes(stream, i) or_return case Unsubscribe_Request: - io.write_byte(stream, 2) or_return + io.write_byte(stream, 3) or_return unsubscribe_request_to_bytes(stream, i) or_return } @@ -280,7 +294,31 @@ client_request_verify :: proc(request: Client_Request, expiration: Maybe(i64)) - return nil } -Server_Request :: struct {} +Server_Request :: union { + Push_Request, +} + +server_request_from_bytes :: proc(stream: io.Stream) -> (request: Server_Request, err: Error) { + kind := io.read_byte(stream) or_return + + switch kind { + case 0: request = push_request_from_bytes(stream) or_return + + case: return {}, Server_Request_From_Bytes_Error.Invalid_Kind + } + + return request, nil +} + +server_request_to_bytes :: proc(stream: io.Stream, request: Server_Request) -> Error { + switch r in request { + case Push_Request: + io.write_byte(stream, 0) or_return + push_request_to_bytes(stream, r) or_return + } + + return nil +} Server_Response :: union { string, @@ -320,6 +358,7 @@ server_message_from_bytes :: proc(server: net.TCP_Socket, stream: io.Stream) -> switch message_type { // Request. case 0: + message = server_request_from_bytes(stream) or_return // Response. case 1: diff --git a/server/client.odin b/server/client.odin index ea2bc7a..e76aa61 100644 --- a/server/client.odin +++ b/server/client.odin @@ -3,6 +3,7 @@ package main import "core:bytes" import "core:encoding/endian" import "core:io" +import "core:math/rand" import "core:mem" import "core:net" import "core:slice" @@ -27,7 +28,39 @@ receive_request :: proc(client: net.TCP_Socket) -> (id: u32, request: common.Cli } send_request :: proc(client: net.TCP_Socket, request: common.Server_Request) -> common.Error { - panic("TODO") + bytes_buf: [1_000]u8 + bytes_buffer: bytes.Buffer + bytes_buffer.buf = slice.into_dynamic(bytes_buf[:]) + request_stream := bytes.buffer_to_stream(&bytes_buffer) + + // Message type. + _ = io.write_byte(request_stream, 0) + + // Request ID. + id_bytes: [size_of(u32)]u8 + _ = rand.read(id_bytes[:]) + + id, _ := endian.get_u32(id_bytes[:], .Little) + + _, _ = io.write(request_stream, id_bytes[:]) + + // Request. + common.server_request_to_bytes(request_stream, request) or_return + + // Length. + buffer_length := bytes.buffer_length(&bytes_buffer) + + len_bytes: [size_of(32)]u8 + endian.put_u32(len_bytes[:], .Little, u32(buffer_length)) + + mem.copy(raw_data(bytes_buf[size_of(u32):]), raw_data(bytes_buf[:buffer_length]), buffer_length) + mem.copy_non_overlapping(raw_data(bytes_buf[:]), raw_data(len_bytes[:]), size_of(u32)) + + response_bytes := bytes_buf[:size_of(u32) + buffer_length] + + net.send_tcp(client, response_bytes) + + return nil } send_response :: proc(client: net.TCP_Socket, id: u32, content: string) -> common.Error { diff --git a/server/main.odin b/server/main.odin index 122c123..6b6169a 100644 --- a/server/main.odin +++ b/server/main.odin @@ -20,18 +20,24 @@ Subscription :: struct { room_name: [common.ROOM_NAME_SIZE]u8, } +Client_Info :: struct { + endpoint: net.Endpoint, + socket: net.TCP_Socket, + public_key: ed25519.Public_Key, +} + App :: struct { data_mutex: sync.Mutex, db_mutex: sync.Mutex, - clients_sockets: map[net.Endpoint]net.TCP_Socket, + clients_info: [dynamic]Client_Info, subscriptions: [dynamic]Subscription, } app_init :: proc() -> App { return { - clients_sockets = make(map[net.Endpoint]net.TCP_Socket), + clients_info = make([dynamic]Client_Info), subscriptions = make([dynamic]Subscription), } @@ -40,17 +46,51 @@ app_init :: proc() -> App { app_deinit :: proc(app: ^App) { delete(app.subscriptions) - for _, socket in app.clients_sockets { - net.close(socket) + for client_info in app.clients_info { + net.close(client_info.socket) + } + + delete(app.clients_info) +} + +app_get_client_info_by_endpoint :: proc(app: ^App, endpoint: net.Endpoint) -> ^Client_Info { + for &client_info in app.clients_info { + if client_info.endpoint == endpoint { + return &client_info + } + } + + return nil +} + +app_get_client_info_by_public_key :: proc(app: ^App, public_key: ed25519.Public_Key) -> ^Client_Info { + public_key := public_key + + for &client_info in app.clients_info { + if ed25519.public_key_equal(&client_info.public_key, &public_key) { + return &client_info + } } - delete(app.clients_sockets) + return nil +} + +app_delete_client_info :: proc(app: ^App, endpoint: net.Endpoint) { + sync.mutex_lock(&app.data_mutex) + defer sync.mutex_unlock(&app.data_mutex) + + for client_info, i in app.clients_info { + if client_info.endpoint == endpoint { + net.close(client_info.socket) + unordered_remove(&app.clients_info, i) + + break + } + } } Handle_Client_Data :: struct { app: ^App, - - client_socket: net.TCP_Socket, client_endpoint: net.Endpoint, } @@ -58,14 +98,19 @@ handle_client :: proc(data: Handle_Client_Data) { using data client_endpoint_string := net.to_string(client_endpoint) + client_socket: net.TCP_Socket - defer { - log.infof("Client %v disconnected.", client_endpoint_string) - + { sync.mutex_lock(&app.data_mutex) defer sync.mutex_unlock(&app.data_mutex) - delete_key(&app.clients_sockets, client_endpoint) + client_info := app_get_client_info_by_endpoint(app, client_endpoint) + client_socket = client_info.socket + } + + defer { + log.infof("Client %v disconnected.", client_endpoint_string) + app_delete_client_info(app, client_endpoint) } recv_buffer: [10_000]u8 @@ -110,6 +155,34 @@ handle_client :: proc(data: Handle_Client_Data) { log.infof("Received request: %v", request) #partial switch &inner in request.inner { + case common.Sync_Request: + sync.mutex_lock(&app.data_mutex) + defer sync.mutex_unlock(&app.data_mutex) + + client_info := app_get_client_info_by_endpoint(app, client_endpoint) + + if client_info != nil { + client_info.public_key = request.public_key + } + + case common.Push_Request: + sync.mutex_lock(&app.data_mutex) + defer sync.mutex_unlock(&app.data_mutex) + + for &subscription in app.subscriptions { + if slice.equal(subscription.room_id[:], inner.room_id[:]) { + client_info := app_get_client_info_by_public_key(app, subscription.public_key) + if client_info == nil do continue + + request := common.Push_Request{ + room_id = inner.room_id, + message = inner.message, + } + + send_request(client_info.socket, request) + } + } + case common.Subscribe_Request: subscription := Subscription{ public_key = request.public_key, @@ -214,13 +287,16 @@ main :: proc() { sync.mutex_lock(&app.data_mutex) defer sync.mutex_unlock(&app.data_mutex) - app.clients_sockets[client_endpoint] = client_socket + client_info := Client_Info{ + endpoint = client_endpoint, + socket = client_socket, + } + + append(&app.clients_info, client_info) } client_handle_data := Handle_Client_Data{ app = &app, - - client_socket = client_socket, client_endpoint = client_endpoint, } diff --git a/server/server b/server/server index 06fc0c6..f857406 100755 Binary files a/server/server and b/server/server differ