Initial Commit

Initial commit of Code Base.
This commit is contained in:
Mario Steele 2025-06-12 14:31:14 -05:00
parent 293b1213e1
commit c11a4ebbc2
653 changed files with 36893 additions and 1 deletions

View file

@ -0,0 +1,379 @@
@icon("res://addons/twitcher/assets/eventsub-icon.svg")
@tool
extends Twitcher
## Handles the evensub part of twitch. Returns the event data when receives it.
class_name TwitchEventsub
static var _log: TwitchLogger = TwitchLogger.new("TwitchEventsub")
static var instance: TwitchEventsub
## An object that identifies the message.
class Metadata extends RefCounted:
## An ID that uniquely identifies the message. Twitch sends messages at least once, but if Twitch is unsure of whether you received a notification, itll resend the message. This means you may receive a notification twice. If Twitch resends the message, the message ID is the same.
var message_id: String
## The type of message, which is set to session_keepalive.
var message_type: String
## The UTC date and time that the message was sent.
var message_timestamp: String
func _init(d: Dictionary):
message_id = d['message_id']
message_type = d['message_type']
message_timestamp = d['message_timestamp']
## An object that contains information about the connection.
class Session extends RefCounted:
## An ID that uniquely identifies this WebSocket connection. Use this ID to set the session_id field in all subscription requests.
var id: String
## The connections status, which is set to connected.
var status: String
## The maximum number of seconds that you should expect silence before receiving a keepalive message. For a welcome message, this is the number of seconds that you have to subscribe to an event after receiving the welcome message. If you dont subscribe to an event within this window, the socket is disconnected.
var keepalive_timeout_seconds: int
## The URL to reconnect to if you get a Reconnect message. Is set to null.
var reconnect_url: String
## The UTC date and time that the connection was created.
var connected_at: String
func _init(d: Dictionary):
id = d["id"]
status = d["status"]
var timeout = d["keepalive_timeout_seconds"]
keepalive_timeout_seconds = timeout if timeout != null else 30
if d["reconnect_url"] != null:
reconnect_url = d["reconnect_url"]
connected_at = d["connected_at"]
## A specific event received from eventsub
class Event extends RefCounted:
var type: TwitchEventsubDefinition:
get(): return TwitchEventsubDefinition.BY_NAME[message.payload.subscription.type]
var data: Dictionary:
get(): return message.payload.event
var message: TwitchNotificationMessage
func _init(notification_message: TwitchNotificationMessage) -> void:
message = notification_message
## Will be send as soon as the websocket connection is up and running you can use it to subscribe to events
signal session_id_received(id: String)
## Will be called when an event is sent from Twitch.
signal event(type: StringName, data: Dictionary)
## Will be called when an event is sent from Twitch. Same like event signal but better named and easier to use in inline awaits.
signal event_received(event: Event)
## Will be called when an event got revoked from your subscription by Twitch.
signal events_revoked(type: StringName, status: String)
## Called when any eventsub message is received for low level access
signal message_received(message: Variant)
@export var api: TwitchAPI
@export var _subscriptions: Array[TwitchEventsubConfig] = []
@export var scopes: OAuthScopes
@export var eventsub_live_server_url: String = "wss://eventsub.wss.twitch.tv/ws"
@export var eventsub_test_server_url: String = "ws://127.0.0.1:8080/ws"
@export var use_test_server: bool
@export var ignore_message_eventsub_in_seconds: int = 600
var _client: WebsocketClient = WebsocketClient.new()
var _test_client : WebsocketClient = WebsocketClient.new()
## Swap over client in case Twitch sends us the message for a new server.
## See: https://dev.twitch.tv/docs/eventsub/handling-websocket-events/#reconnect-message
var _swap_over_client : WebsocketClient
var session: Session
## Holds the messages that was processed already.
## Key: MessageID Value: Timestamp
var eventsub_messages: Dictionary = {}
var last_keepalive: int
var is_open: bool:
get(): return _client.is_open
var _should_connect: bool
## When the Websocket server is shutting down and the client is doing a
## gracefull handover
var _swap_over_process: bool
## queues the actions that should be executed when the connection is established
var _action_stack: Array[SubscriptionAction]
var _executing_action_stack: bool
## Increased on every reconnect without subscriptions
var _empty_connections: int
## Determines the action that the subscription should do
class SubscriptionAction extends RefCounted:
var subscribe: bool
var subscription: TwitchEventsubConfig
func _to_string() -> String:
return "%s %s" % [("Subscribe to" if subscribe else "Unsubscribe from"), subscription.definition.get_readable_name()]
func _init() -> void:
_client.connection_url = eventsub_live_server_url
_client.message_received.connect(_data_received)
_client.connection_established.connect(_on_connection_established)
_client.connection_closed.connect(_on_connection_closed)
_test_client.connection_url = eventsub_test_server_url
_test_client.message_received.connect(_data_received)
func _ready() -> void:
_client.name = "Websocket Client"
add_child(_client)
if use_test_server:
_test_client.name = "Websocket Client Test"
add_child(_test_client)
if api == null: api = TwitchAPI.instance
func _enter_tree() -> void:
if instance == null: instance = self
func _exit_tree() -> void:
if instance == self: instance = null
## Propergated call from twitch service
func do_setup() -> void:
await open_connection()
_log.i("Eventsub setup")
func wait_setup() -> void:
await wait_for_session_established()
## Waits until the eventsub is fully established
func wait_for_session_established() -> void:
if session == null: await session_id_received
func _on_connection_established() -> void:
if not _swap_over_process:
_action_stack.clear()
if _subscriptions.is_empty(): _empty_connections += 1
if _empty_connections >= 3:
_empty_connections = 0
_log.e("Stopped eventsub cause of no subscription.")
close_connection()
return
# Resubscribe
_log.i("Connection established -> resubscribe to: [%s]" % [_subscriptions])
for sub in _subscriptions: _add_action(sub, true)
_execute_action_stack()
func _on_connection_closed() -> void:
session = null
func open_connection() -> void:
if _client.is_closed:
_client.open_connection()
if _test_client.is_closed && use_test_server:
_test_client.open_connection()
func close_connection() -> void:
if not _client.is_closed:
_client.close()
if not _test_client.is_closed:
_test_client.close()
## Add a new subscription
func subscribe(eventsub_config: TwitchEventsubConfig) -> void:
_log.i("Subscribe to %s" % eventsub_config.definition.get_readable_name())
_subscriptions.append(eventsub_config)
_add_action(eventsub_config, true)
_empty_connections = 0
func has_subscription(eventsub_definition: TwitchEventsubDefinition, condition: Dictionary) -> bool:
for subscription: TwitchEventsubConfig in _subscriptions:
if subscription.definition == eventsub_definition && subscription.condition == condition:
return true
return false
## Remove a subscription
func unsubscribe(eventsub_config: TwitchEventsubConfig) -> void:
_subscriptions.erase(eventsub_config)
_add_action(eventsub_config, false)
## Process the queue of actions until its empty
func _execute_action_stack() -> void:
if _executing_action_stack: return
await wait_for_session_established()
_log.d("Execute actions [%s]" % [_action_stack])
_executing_action_stack = true
while not _action_stack.is_empty():
var action = _action_stack.pop_back()
var sub: TwitchEventsubConfig = action.subscription
if action.subscribe:
_subscribe(sub)
else:
_unsubscribe(sub)
_executing_action_stack = false
## Adds a subscribe or unsubscribe action to the queue
func _add_action(sub: TwitchEventsubConfig, subscribe: bool) -> void:
var sub_action = SubscriptionAction.new()
sub_action.subscription = sub
sub_action.subscribe = subscribe
_action_stack.append(sub_action)
_log.d("Add subscribe action: %s" % sub.definition.get_readable_name())
_execute_action_stack()
## Refer to https://dev.twitch.tv/docs/eventsub/eventsub-subscription-types/
## for details on which API versions are available and which conditions are required.
func _subscribe(subscription: TwitchEventsubConfig) -> String:
var event_name = subscription.definition.value
var version = subscription.definition.version
var conditions = subscription.condition
var data : TwitchCreateEventSubSubscription.Body = TwitchCreateEventSubSubscription.Body.new()
var transport : TwitchCreateEventSubSubscription.BodyTransport = TwitchCreateEventSubSubscription.BodyTransport.new()
data.type = event_name
data.version = version
data.condition = conditions
data.transport = transport
transport.method = "websocket"
transport.session_id = session.id
_log.d("Do subscribe: %s" % event_name)
var eventsub_response = await api.create_eventsub_subscription(data)
if eventsub_response.response.response_code == 401:
_log.e("Subscription failed for '%s': Missing authentication for eventsub. The token got not authenticated yet. Please login!" % data.type)
_client.close(3000, "Missing Authentication")
return ""
elif eventsub_response.response.response_code == 403:
_log.e("Subscription failed for '%s': The token is missing proper scopes. [url='%s']Please check documentation[/url]!" % [data.type, subscription.definition.documentation_link])
_log.d(eventsub_response.response.response_data.get_string_from_utf8())
_client.close(3003, "Missing Authorization")
return ""
if eventsub_response.response.response_code < 200 || eventsub_response.response.response_code >= 300:
_log.e("Subscription failed for '%s'. Unknown error %s: %s" % [data.type, eventsub_response.response.response_code, eventsub_response.response.response_data.get_string_from_utf8()])
return ""
elif (eventsub_response.response.response_data.is_empty()):
return ""
_log.i("Now listening to '%s' events." % data.type)
var result = JSON.parse_string(eventsub_response.response.response_data.get_string_from_utf8())
return result.data[0].id
## Unsubscribes from an eventsub in case of an error returns false
func _unsubscribe(subscription: TwitchEventsubConfig) -> bool:
var response = await api.delete_eventsub_subscription(subscription.id)
return response.error || response.response_code != 200
func _data_received(data : PackedByteArray) -> void:
var message_str : String = data.get_string_from_utf8()
var message_json : Dictionary = JSON.parse_string(message_str)
if not message_json.has("metadata"):
_log.e("Twitch send something undocumented: %s" % message_str)
return
var metadata : Metadata = Metadata.new(message_json["metadata"])
var id = metadata.message_id
var timestamp_str = metadata.message_timestamp
var timestamp = Time.get_unix_time_from_datetime_string(timestamp_str)
if(_message_got_processed(id) || _message_is_to_old(timestamp)):
return
eventsub_messages[id] = timestamp
last_keepalive = Time.get_ticks_msec()
match metadata.message_type:
"session_welcome":
var welcome_message = TwitchWelcomeMessage.new(message_json)
session = welcome_message.payload.session
session_id_received.emit(session.id)
_log.i("Session established %s" % session.id)
message_received.emit(welcome_message)
"session_keepalive":
# Notification from server that the connection is still alive
var keep_alive_message = TwitchKeepaliveMessage.new(message_json)
message_received.emit(keep_alive_message)
pass
"session_reconnect":
var reconnect_message = TwitchReconnectMessage.new(message_json)
message_received.emit(reconnect_message)
_handle_reconnect(reconnect_message)
"revocation":
var revocation_message = TwitchRevocationMessage.new(message_json)
message_received.emit(revocation_message)
events_revoked.emit(revocation_message.payload.subscription.type,
revocation_message.payload.subscription.status)
"notification":
var notification_message = TwitchNotificationMessage.new(message_json)
message_received.emit(notification_message)
event.emit(notification_message.payload.subscription.type,
notification_message.payload.event)
event_received.emit(Event.new(notification_message))
_cleanup()
func _handle_reconnect(reconnect_message: TwitchReconnectMessage):
_log.i("Session is forced to reconnect")
_swap_over_process = true
var reconnect_url = reconnect_message.payload.session.reconnect_url
_swap_over_client = WebsocketClient.new()
_swap_over_client.message_received.connect(_data_received)
_swap_over_client.connection_established.connect(_on_connection_established)
_swap_over_client.connection_url = reconnect_url
add_child(_swap_over_client)
_swap_over_client.open_connection()
await session_id_received
_client.close(1000, "Closed cause of reconnect.")
remove_child(_client)
_client = _swap_over_client
_swap_over_client = null
_swap_over_process = false
_log.i("Session reconnected on %s" % reconnect_url)
## Cleanup old messages that won't be processed anymore cause of time to prevent a
## memory problem on long runinng applications.
func _cleanup() -> void:
for message_id in eventsub_messages.keys():
var timestamp = eventsub_messages[message_id]
if _message_is_to_old(timestamp):
eventsub_messages.erase(message_id)
func _message_got_processed(message_id: String) -> bool:
return eventsub_messages.has(message_id)
func _message_is_to_old(timestamp: int) -> bool:
return timestamp < Time.get_unix_time_from_system() - ignore_message_eventsub_in_seconds
func get_client() -> WebsocketClient:
return _client
func get_test_client() -> WebsocketClient:
return _test_client
## Returns a copy of the current subscribed events. Don't modify the result they won't get applied anyway.
func get_subscriptions() -> Array[TwitchEventsubConfig]:
return _subscriptions.duplicate()