Invalid_Public_Key,
}
+Server_Request_From_Bytes_Error :: enum {
+ Invalid_Kind,
+}
+
Request_Verify_Error :: enum {
Expired,
Invalid_Signature,
Receive_Message_Error,
Client_Request_From_Bytes_Error,
+ Server_Request_From_Bytes_Error,
Request_Verify_Error,
Room_ID :: [sha3.DIGEST_SIZE_256]u8
+Sync_Request :: struct {}
+
Push_Request :: struct {
room_id: Room_ID,
message: [MESSAGE_SIZE]u8,
}
Client_Request_Inner :: union {
+ Sync_Request,
+
Push_Request,
Subscribe_Request,
inner: Client_Request_Inner
switch kind {
- case 0: inner = push_request_from_bytes(stream) or_return
+ case 0: inner = Sync_Request{}
- case 1: inner = subscribe_request_from_bytes(stream) or_return
- case 2: inner = unsubscribe_request_from_bytes(stream) or_return
+ case 1: inner = push_request_from_bytes(stream) or_return
+
+ case 2: inner = subscribe_request_from_bytes(stream) or_return
+ case 3: inner = unsubscribe_request_from_bytes(stream) or_return
case: return {}, Client_Request_From_Bytes_Error.Invalid_Inner_Kind
}
client_request_inner_to_bytes :: proc(stream: io.Stream, inner: Client_Request_Inner) -> Error {
switch i in inner {
- case Push_Request:
+ case Sync_Request:
io.write_byte(stream, 0) or_return
+
+ case Push_Request:
+ io.write_byte(stream, 1) or_return
push_request_to_bytes(stream, i) or_return
case Subscribe_Request:
- io.write_byte(stream, 1) or_return
+ io.write_byte(stream, 2) or_return
subscribe_request_to_bytes(stream, i) or_return
case Unsubscribe_Request:
- io.write_byte(stream, 2) or_return
+ io.write_byte(stream, 3) or_return
unsubscribe_request_to_bytes(stream, i) or_return
}
return nil
}
-Server_Request :: struct {}
+Server_Request :: union {
+ Push_Request,
+}
+
+server_request_from_bytes :: proc(stream: io.Stream) -> (request: Server_Request, err: Error) {
+ kind := io.read_byte(stream) or_return
+
+ switch kind {
+ case 0: request = push_request_from_bytes(stream) or_return
+
+ case: return {}, Server_Request_From_Bytes_Error.Invalid_Kind
+ }
+
+ return request, nil
+}
+
+server_request_to_bytes :: proc(stream: io.Stream, request: Server_Request) -> Error {
+ switch r in request {
+ case Push_Request:
+ io.write_byte(stream, 0) or_return
+ push_request_to_bytes(stream, r) or_return
+ }
+
+ return nil
+}
Server_Response :: union {
string,
switch message_type {
// Request.
case 0:
+ message = server_request_from_bytes(stream) or_return
// Response.
case 1:
import "core:bytes"
import "core:encoding/endian"
import "core:io"
+import "core:math/rand"
import "core:mem"
import "core:net"
import "core:slice"
}
send_request :: proc(client: net.TCP_Socket, request: common.Server_Request) -> common.Error {
- panic("TODO")
+ bytes_buf: [1_000]u8
+ bytes_buffer: bytes.Buffer
+ bytes_buffer.buf = slice.into_dynamic(bytes_buf[:])
+ request_stream := bytes.buffer_to_stream(&bytes_buffer)
+
+ // Message type.
+ _ = io.write_byte(request_stream, 0)
+
+ // Request ID.
+ id_bytes: [size_of(u32)]u8
+ _ = rand.read(id_bytes[:])
+
+ id, _ := endian.get_u32(id_bytes[:], .Little)
+
+ _, _ = io.write(request_stream, id_bytes[:])
+
+ // Request.
+ common.server_request_to_bytes(request_stream, request) or_return
+
+ // Length.
+ buffer_length := bytes.buffer_length(&bytes_buffer)
+
+ len_bytes: [size_of(32)]u8
+ endian.put_u32(len_bytes[:], .Little, u32(buffer_length))
+
+ mem.copy(raw_data(bytes_buf[size_of(u32):]), raw_data(bytes_buf[:buffer_length]), buffer_length)
+ mem.copy_non_overlapping(raw_data(bytes_buf[:]), raw_data(len_bytes[:]), size_of(u32))
+
+ response_bytes := bytes_buf[:size_of(u32) + buffer_length]
+
+ net.send_tcp(client, response_bytes)
+
+ return nil
}
send_response :: proc(client: net.TCP_Socket, id: u32, content: string) -> common.Error {
room_name: [common.ROOM_NAME_SIZE]u8,
}
+Client_Info :: struct {
+ endpoint: net.Endpoint,
+ socket: net.TCP_Socket,
+ public_key: ed25519.Public_Key,
+}
+
App :: struct {
data_mutex: sync.Mutex,
db_mutex: sync.Mutex,
- clients_sockets: map[net.Endpoint]net.TCP_Socket,
+ clients_info: [dynamic]Client_Info,
subscriptions: [dynamic]Subscription,
}
app_init :: proc() -> App {
return {
- clients_sockets = make(map[net.Endpoint]net.TCP_Socket),
+ clients_info = make([dynamic]Client_Info),
subscriptions = make([dynamic]Subscription),
}
app_deinit :: proc(app: ^App) {
delete(app.subscriptions)
- for _, socket in app.clients_sockets {
- net.close(socket)
+ for client_info in app.clients_info {
+ net.close(client_info.socket)
+ }
+
+ delete(app.clients_info)
+}
+
+app_get_client_info_by_endpoint :: proc(app: ^App, endpoint: net.Endpoint) -> ^Client_Info {
+ for &client_info in app.clients_info {
+ if client_info.endpoint == endpoint {
+ return &client_info
+ }
+ }
+
+ return nil
+}
+
+app_get_client_info_by_public_key :: proc(app: ^App, public_key: ed25519.Public_Key) -> ^Client_Info {
+ public_key := public_key
+
+ for &client_info in app.clients_info {
+ if ed25519.public_key_equal(&client_info.public_key, &public_key) {
+ return &client_info
+ }
}
- delete(app.clients_sockets)
+ return nil
+}
+
+app_delete_client_info :: proc(app: ^App, endpoint: net.Endpoint) {
+ sync.mutex_lock(&app.data_mutex)
+ defer sync.mutex_unlock(&app.data_mutex)
+
+ for client_info, i in app.clients_info {
+ if client_info.endpoint == endpoint {
+ net.close(client_info.socket)
+ unordered_remove(&app.clients_info, i)
+
+ break
+ }
+ }
}
Handle_Client_Data :: struct {
app: ^App,
-
- client_socket: net.TCP_Socket,
client_endpoint: net.Endpoint,
}
using data
client_endpoint_string := net.to_string(client_endpoint)
+ client_socket: net.TCP_Socket
- defer {
- log.infof("Client %v disconnected.", client_endpoint_string)
-
+ {
sync.mutex_lock(&app.data_mutex)
defer sync.mutex_unlock(&app.data_mutex)
- delete_key(&app.clients_sockets, client_endpoint)
+ client_info := app_get_client_info_by_endpoint(app, client_endpoint)
+ client_socket = client_info.socket
+ }
+
+ defer {
+ log.infof("Client %v disconnected.", client_endpoint_string)
+ app_delete_client_info(app, client_endpoint)
}
recv_buffer: [10_000]u8
log.infof("Received request: %v", request)
#partial switch &inner in request.inner {
+ case common.Sync_Request:
+ sync.mutex_lock(&app.data_mutex)
+ defer sync.mutex_unlock(&app.data_mutex)
+
+ client_info := app_get_client_info_by_endpoint(app, client_endpoint)
+
+ if client_info != nil {
+ client_info.public_key = request.public_key
+ }
+
+ case common.Push_Request:
+ sync.mutex_lock(&app.data_mutex)
+ defer sync.mutex_unlock(&app.data_mutex)
+
+ for &subscription in app.subscriptions {
+ if slice.equal(subscription.room_id[:], inner.room_id[:]) {
+ client_info := app_get_client_info_by_public_key(app, subscription.public_key)
+ if client_info == nil do continue
+
+ request := common.Push_Request{
+ room_id = inner.room_id,
+ message = inner.message,
+ }
+
+ send_request(client_info.socket, request)
+ }
+ }
+
case common.Subscribe_Request:
subscription := Subscription{
public_key = request.public_key,
sync.mutex_lock(&app.data_mutex)
defer sync.mutex_unlock(&app.data_mutex)
- app.clients_sockets[client_endpoint] = client_socket
+ client_info := Client_Info{
+ endpoint = client_endpoint,
+ socket = client_socket,
+ }
+
+ append(&app.clients_info, client_info)
}
client_handle_data := Handle_Client_Data{
app = &app,
-
- client_socket = client_socket,
client_endpoint = client_endpoint,
}