upload.py 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166
  1. # mautrix-instagram - A Matrix-Instagram puppeting bridge.
  2. # Copyright (C) 2022 Tulir Asokan
  3. #
  4. # This program is free software: you can redistribute it and/or modify
  5. # it under the terms of the GNU Affero General Public License as published by
  6. # the Free Software Foundation, either version 3 of the License, or
  7. # (at your option) any later version.
  8. #
  9. # This program is distributed in the hope that it will be useful,
  10. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. # GNU Affero General Public License for more details.
  13. #
  14. # You should have received a copy of the GNU Affero General Public License
  15. # along with this program. If not, see <https://www.gnu.org/licenses/>.
  16. from __future__ import annotations
  17. from uuid import uuid4
  18. import json
  19. import random
  20. import time
  21. from ..types import FinishUploadResponse, MediaType, UploadPhotoResponse, UploadVideoResponse
  22. from .base import BaseAndroidAPI
  23. class UploadAPI(BaseAndroidAPI):
  24. async def upload_photo(
  25. self,
  26. data: bytes,
  27. mime: str,
  28. upload_id: str | None = None,
  29. width: int | None = None,
  30. height: int | None = None,
  31. ) -> UploadPhotoResponse:
  32. upload_id = upload_id or str(int(time.time() * 1000))
  33. name = f"{upload_id}_0_{random.randint(1000000000, 9999999999)}"
  34. params = {
  35. "retry_context": json.dumps(
  36. {
  37. "num_step_auto_retry": 0,
  38. "num_reupload": 0,
  39. "num_step_manual_retry": 0,
  40. }
  41. ),
  42. "media_type": str(MediaType.IMAGE.value),
  43. "upload_id": upload_id,
  44. "xsharing_user_ids": json.dumps([]),
  45. }
  46. if mime == "image/jpeg":
  47. params["image_compression"] = json.dumps(
  48. {"lib_name": "moz", "lib_version": "3.1.m", "quality": 80}
  49. )
  50. if width and height:
  51. params["original_width"] = str(width)
  52. params["original_height"] = str(height)
  53. headers = {
  54. "X_FB_PHOTO_WATERFALL_ID": str(uuid4()),
  55. "X-Entity-Type": mime,
  56. "Offset": "0",
  57. "X-Instagram-Rupload-Params": json.dumps(params),
  58. "X-Entity-Name": name,
  59. "X-Entity-Length": str(len(data)),
  60. "Content-Type": "application/octet-stream",
  61. "priority": "u=6, i",
  62. }
  63. return await self.std_http_post(
  64. f"/rupload_igphoto/{name}",
  65. headers=headers,
  66. data=data,
  67. raw=True,
  68. response_type=UploadPhotoResponse,
  69. )
  70. async def upload_mp4(
  71. self,
  72. data: bytes,
  73. upload_id: str | None = None,
  74. audio: bool = False,
  75. duration_ms: int | None = None,
  76. width: int | None = None,
  77. height: int | None = None,
  78. ) -> tuple[UploadVideoResponse, str]:
  79. upload_id = upload_id or str(int(time.time() * 1000))
  80. name = f"{upload_id}_0_{random.randint(1000000000, 9999999999)}"
  81. media_type = MediaType.AUDIO if audio else MediaType.VIDEO
  82. params: dict[str, str] = {
  83. "retry_context": json.dumps(
  84. {
  85. "num_step_auto_retry": 0,
  86. "num_reupload": 0,
  87. "num_step_manual_retry": 0,
  88. }
  89. ),
  90. "media_type": str(media_type.value),
  91. "upload_id": upload_id,
  92. "xsharing_user_ids": json.dumps([]),
  93. }
  94. if duration_ms:
  95. params["upload_media_duration_ms"] = str(duration_ms)
  96. if audio:
  97. params["is_direct_voice"] = "1"
  98. else:
  99. params["direct_v2"] = "1"
  100. params["for_direct_story"] = "1"
  101. params["content_tags"] = "use_default_cover"
  102. params["extract_cover_frame"] = "1"
  103. if width and height:
  104. params["upload_media_width"] = str(width)
  105. params["upload_media_height"] = str(height)
  106. headers = {
  107. "X_FB_VIDEO_WATERFALL_ID": str(uuid4()),
  108. "X-Entity-Type": "audio/mp4" if audio else "video/mp4",
  109. "Offset": "0",
  110. "X-Instagram-Rupload-Params": json.dumps(params),
  111. "X-Entity-Name": name,
  112. "X-Entity-Length": str(len(data)),
  113. "Content-Type": "application/octet-stream",
  114. "priority": "u=6, i",
  115. }
  116. if not audio:
  117. headers["segment-type"] = "3"
  118. headers["segment-start-offset"] = "0"
  119. return (
  120. await self.std_http_post(
  121. f"/rupload_igvideo/{name}",
  122. headers=headers,
  123. data=data,
  124. raw=True,
  125. response_type=UploadVideoResponse,
  126. ),
  127. upload_id,
  128. )
  129. async def finish_upload(
  130. self, upload_id: str, source_type: str, video: bool = False
  131. ) -> FinishUploadResponse:
  132. headers = {
  133. "retry_context": json.dumps(
  134. {
  135. "num_step_auto_retry": 0,
  136. "num_reupload": 0,
  137. "num_step_manual_retry": 0,
  138. }
  139. ),
  140. }
  141. req = {
  142. "timezone_offset": self.state.device.timezone_offset,
  143. "_csrftoken": self.state.cookies.csrf_token,
  144. "source_type": source_type,
  145. "_uid": self.state.cookies.user_id,
  146. "device_id": self.state.device.id,
  147. "_uuid": self.state.device.uuid,
  148. "upload_id": upload_id,
  149. "device": self.state.device.payload,
  150. }
  151. query = {}
  152. if video:
  153. query["video"] = "1"
  154. return await self.std_http_post(
  155. "/api/v1/media/upload_finish/",
  156. headers=headers,
  157. data=req,
  158. query=query,
  159. response_type=FinishUploadResponse,
  160. )