pokepurple/addons/twitcher/media/twitch_media_loader.gd

436 lines
17 KiB
GDScript3
Raw Permalink Normal View History

@icon("res://addons/twitcher/assets/media-loader-icon.svg")
@tool
extends Twitcher
## Will load badges, icons and profile images
class_name TwitchMediaLoader
static var _log: TwitchLogger = TwitchLogger.new("TwitchMediaLoader")
static var instance: TwitchMediaLoader
## Called when an emoji was succesfully loaded
signal emoji_loaded(definition: TwitchEmoteDefinition)
const FALLBACK_TEXTURE = preload("res://addons/twitcher/assets/fallback_texture.tres")
const FALLBACK_PROFILE = preload("res://addons/twitcher/assets/no_profile.png")
@export var api: TwitchAPI
@export var image_transformer: TwitchImageTransformer = TwitchImageTransformer.new():
set(val):
image_transformer = val
update_configuration_warnings()
@export var fallback_texture: Texture2D = FALLBACK_TEXTURE
@export var fallback_profile: Texture2D = FALLBACK_PROFILE
@export var image_cdn_host: String = "https://static-cdn.jtvnw.net"
## Will preload the whole badge and emote cache also to editor time (use it when you make a Editor Plugin with Twitch Support)
@export var load_cache_in_editor: bool
@export_global_dir var cache_emote: String = "user://emotes"
@export_global_dir var cache_badge: String = "user://badges"
@export_global_dir var cache_cheermote: String = "user://cheermote"
## All requests that are currently in progress
var _requests_in_progress : Array[StringName]
## Badge definition for global and the channel.
var _cached_badges : Dictionary = {}
## Emote definition for global and the channel.
var _cached_emotes : Dictionary = {}
## Key: String Cheer Prefix | Value: TwitchCheermote
var _cached_cheermotes: Dictionary[String, TwitchCheermote] = {}
## All cached emotes, badges, cheermotes
## Is needed that the garbage collector isn't deleting our cache.
var _cached_images : Array[SpriteFrames] = []
var _host_parser = RegEx.create_from_string("(https://.*?)/")
var static_image_transformer = TwitchImageTransformer.new()
var _client: BufferedHTTPClient
func _ready() -> void:
_client = BufferedHTTPClient.new()
_client.name = "TwitchMediaLoaderClient"
add_child(_client)
_load_cache()
if api == null: api = TwitchAPI.instance
func _enter_tree() -> void:
if instance == null: instance = self
func _exit_tree() -> void:
if instance == self: instance = null
## Loading all images from the directory into the memory cache
func _load_cache() -> void:
if Engine.is_editor_hint() || load_cache_in_editor:
_cache_directory(cache_emote)
_cache_directory(cache_badge)
func _cache_directory(path: String):
DirAccess.make_dir_recursive_absolute(path)
var files = DirAccess.get_files_at(path)
for file in files:
if file.ends_with(".res"):
var res_path = path.path_join(file)
var sprite_frames: SpriteFrames = ResourceLoader.load(res_path, "SpriteFrames")
var spriteframe_path = res_path.trim_suffix(".res")
sprite_frames.take_over_path(spriteframe_path)
_cached_images.append(sprite_frames)
#region Emotes
func preload_emotes(channel_id: String = "global") -> void:
if (!_cached_emotes.has(channel_id)):
var response
if channel_id == "global":
_log.i("Preload global emotes")
response = await api.get_global_emotes()
else:
_log.i("Preload channel(%s) emotes" % channel_id)
response = await api.get_channel_emotes(channel_id)
_cached_emotes[channel_id] = _map_emotes(response)
## Returns requested emotes.
## Key: EmoteID as String | Value: SpriteFrames
func get_emotes(emote_ids : Array[String]) -> Dictionary[String, SpriteFrames]:
_log.i("Get emotes: %s" % emote_ids)
var requests: Array[TwitchEmoteDefinition] = []
for id: String in emote_ids:
requests.append(TwitchEmoteDefinition.new(id))
var emotes: Dictionary[TwitchEmoteDefinition, SpriteFrames] = await get_emotes_by_definition(requests)
var result: Dictionary[String, SpriteFrames] = {}
# Remap the emotes to string value easier for processing
for requested_emote: TwitchEmoteDefinition in requests:
result[requested_emote.id] = emotes[requested_emote]
return result
## Returns requested emotes.
## Key: TwitchEmoteDefinition | Value: SpriteFrames
func get_emotes_by_definition(emote_definitions : Array[TwitchEmoteDefinition]) -> Dictionary[TwitchEmoteDefinition, SpriteFrames]:
var response: Dictionary[TwitchEmoteDefinition, SpriteFrames] = {}
var requests: Dictionary[TwitchEmoteDefinition, BufferedHTTPClient.RequestData] = {}
for emote_definition: TwitchEmoteDefinition in emote_definitions:
var original_file_cache_path: String = _get_emote_cache_path(emote_definition)
var spriteframe_path: String = _get_emote_cache_path_spriteframe(emote_definition)
if ResourceLoader.has_cached(spriteframe_path):
_log.d("Use cached emote %s" % emote_definition)
response[emote_definition] = ResourceLoader.load(spriteframe_path)
continue
if not image_transformer.is_supporting_animation():
emote_definition.type_static()
if _requests_in_progress.has(original_file_cache_path): continue
_requests_in_progress.append(original_file_cache_path)
_log.d("Request emote %s" % emote_definition)
var request : BufferedHTTPClient.RequestData = _load_emote(emote_definition)
requests[emote_definition] = request
for emote_definition : TwitchEmoteDefinition in requests:
var original_file_cache_path : String = _get_emote_cache_path(emote_definition)
var spriteframe_path : String = _get_emote_cache_path_spriteframe(emote_definition)
var request : BufferedHTTPClient.RequestData = requests[emote_definition]
var sprite_frames : SpriteFrames = await _convert_response(request, original_file_cache_path, spriteframe_path)
response[emote_definition] = sprite_frames
_cached_images.append(sprite_frames)
_requests_in_progress.erase(original_file_cache_path)
emoji_loaded.emit(emote_definition)
for emote_definition: TwitchEmoteDefinition in emote_definitions:
if not response.has(emote_definition):
var cache : String = _get_emote_cache_path_spriteframe(emote_definition)
response[emote_definition] = ResourceLoader.load(cache)
return response
## Returns the path where the raw emoji should be cached
func _get_emote_cache_path(emote_definition: TwitchEmoteDefinition) -> String:
var file_name : String = emote_definition.get_file_name()
return cache_emote.path_join(file_name)
## Returns the path where the converted spriteframe should be cached
func _get_emote_cache_path_spriteframe(emote_definition: TwitchEmoteDefinition) -> String:
var file_name : String = emote_definition.get_file_name() + ".res"
return cache_emote.path_join(file_name)
func _load_emote(emote_definition : TwitchEmoteDefinition) -> BufferedHTTPClient.RequestData:
var request_path : String = "/emoticons/v2/%s/%s/%s/%1.1f" % [emote_definition.id, emote_definition._type, emote_definition._theme, emote_definition._scale]
return _client.request(image_cdn_host + request_path, HTTPClient.METHOD_GET, {}, "")
func _map_emotes(result: Variant) -> Dictionary:
var mappings : Dictionary = {}
var emotes : Array = result.get("data")
if emotes == null:
return mappings
for emote in emotes:
mappings[emote.get("id")] = emote
return mappings
func get_cached_emotes(channel_id) -> Dictionary:
if not _cached_emotes.has(channel_id):
await preload_emotes(channel_id)
return _cached_emotes[channel_id]
#endregion
#region Badges
func preload_badges(channel_id: String = "global") -> void:
if not _cached_badges.has(channel_id):
var response: Variant # TwitchGetGlobalChatBadges.Response | TwitchGetChannelChatBadges.Response
if channel_id == "global":
_log.i("Preload global badges")
response = await(api.get_global_chat_badges())
else:
_log.i("Preload channel(%s) badges" % channel_id)
response = await(api.get_channel_chat_badges(channel_id))
_cached_badges[channel_id] = _cache_badges(response)
## Returns the requested badge either from cache or loads from web. Scale can be 1, 2 or 4.
## Key: TwitchBadgeDefinition | Value: SpriteFrames
func get_badges(badges: Array[TwitchBadgeDefinition]) -> Dictionary[TwitchBadgeDefinition, SpriteFrames]:
var response: Dictionary[TwitchBadgeDefinition, SpriteFrames] = {}
var requests: Dictionary[TwitchBadgeDefinition, BufferedHTTPClient.RequestData] = {}
for badge_definition : TwitchBadgeDefinition in badges:
var cache_id : String = badge_definition.get_cache_id()
var badge_path : String = cache_badge.path_join(cache_id)
if ResourceLoader.has_cached(badge_path):
_log.d("Use cached badge %s" % badge_definition)
response[badge_definition] = ResourceLoader.load(badge_path)
else:
_log.d("Request badge %s" % badge_definition)
var request : BufferedHTTPClient.RequestData = await _load_badge(badge_definition)
requests[badge_definition] = request
for badge_definition : TwitchBadgeDefinition in requests:
var request = requests[badge_definition]
var id : String = badge_definition.get_cache_id()
var cache_path : String = cache_badge.path_join(id)
var spriteframe_path : String = cache_badge.path_join(id) + ".res"
var sprite_frames : SpriteFrames = await _convert_response(request, cache_path, spriteframe_path)
response[badge_definition] = sprite_frames
_cached_images.append(sprite_frames)
return response
func _load_badge(badge_definition: TwitchBadgeDefinition) -> BufferedHTTPClient.RequestData:
var channel_id : String = badge_definition.channel
var badge_set : String = badge_definition.badge_set
var badge_id : String = badge_definition.badge_id
var scale : int = badge_definition.scale
var is_global_chanel : bool = channel_id == "global"
if not _cached_badges.has(channel_id):
await preload_badges(channel_id)
var channel_has_badge : bool = _cached_badges[channel_id].has(badge_set) && _cached_badges[channel_id][badge_set]["versions"].has(badge_id)
if not is_global_chanel and not channel_has_badge:
badge_definition.channel = "global"
return await _load_badge(badge_definition)
var request_path : String = _cached_badges[channel_id][badge_set]["versions"][badge_id]["image_url_%sx" % scale]
return _client.request(request_path, HTTPClient.METHOD_GET, {}, "")
## Maps the badges into a dict of category / versions / badge_id
func _cache_badges(result: Variant) -> Dictionary:
var mappings : Dictionary = {}
var badges : Array = result["data"]
for badge in badges:
if not mappings.has(badge["set_id"]):
mappings[badge["set_id"]] = {
"set_id": badge["set_id"],
"versions" : {}
}
for version in badge["versions"]:
mappings[badge["set_id"]]["versions"][version["id"]] = version
return mappings
func get_cached_badges(channel_id: String) -> Dictionary:
if(!_cached_badges.has(channel_id)):
await preload_badges(channel_id)
return _cached_badges[channel_id]
#endregion
#region Cheermote
class CheerResult extends RefCounted:
var cheermote: TwitchCheermote
var tier: TwitchCheermote.Tiers
var spriteframes: SpriteFrames
func _init(cheer: TwitchCheermote, t: TwitchCheermote.Tiers, sprites: SpriteFrames):
cheermote = cheer
tier = t
spriteframes = sprites
func preload_cheemote() -> void:
if not _cached_cheermotes.is_empty(): return
_log.i("Preload cheermotes")
var cheermote_response: TwitchGetCheermotes.Response = await api.get_cheermotes(null)
for data: TwitchCheermote in cheermote_response.data:
_log.d("- found %s" % data.prefix)
_cached_cheermotes[data.prefix] = data
func all_cheermotes() -> Array[TwitchCheermote]:
var cheermotes: Array[TwitchCheermote] = []
cheermotes.assign(_cached_cheermotes.values())
return cheermotes
## Resolves a info with spriteframes for a specific cheer definition contains also spriteframes for the given tier.
## Can be null when not found.
func get_cheer_info(cheermote_definition: TwitchCheermoteDefinition) -> CheerResult:
await preload_cheemote()
var cheermote : TwitchCheermote = _cached_cheermotes[cheermote_definition.prefix]
for cheertier: TwitchCheermote.Tiers in cheermote.tiers:
if cheertier.id == cheermote_definition.tier:
var sprite_frames: SpriteFrames = await _get_cheermote_sprite_frames(cheertier, cheermote_definition)
return CheerResult.new(cheermote, cheertier, sprite_frames)
return null
## Finds the tier depending on the given number
func find_cheer_tier(number: int, cheer_data: TwitchCheermote) -> TwitchCheermote.Tiers:
var current_tier: TwitchCheermote.Tiers = cheer_data.tiers[0]
for tier: TwitchCheermote.Tiers in cheer_data.tiers:
if tier.min_bits < number && current_tier.min_bits < tier.min_bits:
current_tier = tier
return current_tier
## Returns spriteframes mapped by tier for a cheermote
## Key: TwitchCheermote.Tiers | Value: SpriteFrames
func get_cheermotes(cheermote_definition: TwitchCheermoteDefinition) -> Dictionary[TwitchCheermote.Tiers, SpriteFrames]:
await preload_cheemote()
var response : Dictionary[TwitchCheermote.Tiers, SpriteFrames] = {}
var requests : Dictionary[TwitchCheermote.Tiers, BufferedHTTPClient.RequestData] = {}
var cheer : TwitchCheermote = _cached_cheermotes[cheermote_definition.prefix]
for tier : TwitchCheermote.Tiers in cheer.tiers:
var id = cheermote_definition.get_id()
if ResourceLoader.has_cached(id):
_log.d("Use cached cheer %s" % cheermote_definition)
response[tier] = ResourceLoader.load(id)
if not image_transformer.is_supporting_animation():
cheermote_definition.type_static()
else:
_log.d("Request cheer %s" % cheermote_definition)
requests[tier] = _request_cheermote(tier, cheermote_definition)
for tier: TwitchCheermote.Tiers in requests:
var id = cheermote_definition.get_id()
var request = requests[tier]
var sprite_frames = await _wait_for_cheeremote(request, id)
response[tier] = sprite_frames
return response
func _get_cheermote_sprite_frames(tier: TwitchCheermote.Tiers, cheermote_definition: TwitchCheermoteDefinition) -> SpriteFrames:
var id = cheermote_definition.get_id()
if ResourceLoader.has_cached(id):
return ResourceLoader.load(id)
else:
var request : BufferedHTTPClient.RequestData = _request_cheermote(tier, cheermote_definition)
if request == null:
var frames : SpriteFrames = SpriteFrames.new()
frames.add_frame("default", fallback_texture)
return frames
return await _wait_for_cheeremote(request, id)
func _wait_for_cheeremote(request: BufferedHTTPClient.RequestData, cheer_id: String) -> SpriteFrames:
var response : BufferedHTTPClient.ResponseData = await _client.wait_for_request(request)
var cache_path : String = cache_cheermote.path_join(cheer_id)
var sprite_frames : SpriteFrames = await image_transformer.convert_image(
cache_path,
response.response_data,
cache_path + ".res") as SpriteFrames
sprite_frames.take_over_path(cheer_id)
_cached_images.append(sprite_frames)
return sprite_frames
func _request_cheermote(cheer_tier: TwitchCheermote.Tiers, cheermote: TwitchCheermoteDefinition) -> BufferedHTTPClient.RequestData:
var img_path : String = cheer_tier.images[cheermote.theme][cheermote.type][cheermote.scale]
var host_result : RegExMatch = _host_parser.search(img_path)
if host_result == null: return null
var host : String = host_result.get_string(1)
return _client.request(img_path, HTTPClient.METHOD_GET, {}, "")
#endregion
#region Utilities
func _get_configuration_warnings() -> PackedStringArray:
if image_transformer == null || not image_transformer.is_supported():
return ["Image transformer is misconfigured"]
return []
func load_image(url: String) -> Image:
var request : BufferedHTTPClient.RequestData = _client.request(url, HTTPClient.METHOD_GET, {}, "")
var response : BufferedHTTPClient.ResponseData = await _client.wait_for_request(request)
var temp_file : FileAccess = FileAccess.create_temp(FileAccess.ModeFlags.WRITE_READ, "image_", url.get_extension(), true)
temp_file.store_buffer(response.response_data)
temp_file.flush()
var image : Image = Image.load_from_file(temp_file.get_path())
return image
## Get the image of an user
func load_profile_image(user: TwitchUser) -> ImageTexture:
if user == null: return fallback_profile
if ResourceLoader.has_cached(user.profile_image_url):
return ResourceLoader.load(user.profile_image_url)
var request := _client.request(user.profile_image_url, HTTPClient.METHOD_GET, {}, "")
var response_data := await _client.wait_for_request(request)
var texture : ImageTexture = ImageTexture.new()
var response := response_data.response_data
if not response.is_empty():
var img := Image.new()
var content_type = response_data.response_header["Content-Type"]
match content_type:
"image/png": img.load_png_from_buffer(response)
"image/jpeg": img.load_jpg_from_buffer(response)
_: return fallback_profile
texture.set_image(img)
else:
# Don't use `texture = fallback_profile` as texture cause the path will be taken over
# for caching purpose!
texture.set_image(fallback_profile.get_image())
texture.take_over_path(user.profile_image_url)
return texture
const GIF_HEADER: PackedByteArray = [71, 73, 70]
func _convert_response(request: BufferedHTTPClient.RequestData, cache_path: String, spriteframe_path: String) -> SpriteFrames:
var response = await _client.wait_for_request(request)
var response_data = response.response_data as PackedByteArray
var file_head = response_data.slice(0, 3)
# REMARK: don't use content-type... twitch doesn't check and sends PNGs with GIF content type.
if file_head == GIF_HEADER:
return await image_transformer.convert_image(cache_path, response_data, spriteframe_path) as SpriteFrames
else:
return await static_image_transformer.convert_image(cache_path, response_data, spriteframe_path) as SpriteFrames
#endregion