This repository has been archived on 2022-08-28. You can view files and clone it, but cannot push or open issues or pull requests.
vk-mastodon-bridge/vk_mastodon_bridge.py

215 lines
7.5 KiB
Python

__version__ = '0.2.1'
import os
import sys
import json
import time
import datetime
import logging
import shutil
import urllib.parse
import requests
MASTODON_API_URL = os.environ['MASTODON_API_URL']
MASTODON_API_ACCESS_TOKEN = os.environ['MASTODON_API_ACCESS_TOKEN']
VK_API_URL = os.environ['VK_API_URL']
VK_API_VERSION = os.environ['VK_API_VERSION']
VK_API_ACCESS_TOKEN = os.environ['VK_API_ACCESS_TOKEN']
VK_GROUP_DOMAIN = os.environ['VK_GROUP_DOMAIN']
# Set up logger
logger = logging.getLogger('vk_mastodon_bridge')
logger.setLevel(logging.INFO)
handler = logging.StreamHandler(stream=sys.stdout)
handler.setFormatter(logging.Formatter(fmt = '[%(asctime)s: %(levelname)s] %(message)s'))
logger.addHandler(handler)
def get_vk_group_last_post():
"""Return dict with VK group last post data."""
return json.loads(requests.get(VK_API_URL + '/wall.get' \
+ '?v=' + VK_API_VERSION \
+ '&access_token=' + VK_API_ACCESS_TOKEN \
+ '&domain=' + VK_GROUP_DOMAIN \
+ '&count=1').text)
def get_vk_post_text(post_data: dict) -> str:
"""See: https://dev.vk.com/reference/objects/post"""
return post_data['response']['items'][0]['text']
def get_vk_post_url(post_data: dict) -> str:
"""Return link to original post on vk.com
See: https://dev.vk.com/reference/objects/post
"""
wall_id = str(post_data['response']['items'][0]['owner_id'])
post_id = str(post_data['response']['items'][0]['id'])
return 'https://vk.com/wall' + wall_id + '_' + post_id
def get_vk_post_attachments(post_data: dict) -> list:
"""Process attachments. See attachments at
https://dev.vk.com/method/wall.post
Return list of dicts with following structure::
[{'photo': 'url'}, {'album': 'url'}]
"""
attachments = []
raw_attachments = post_data['response']['items'][0]['attachments']
for attachment in raw_attachments:
if attachment['type'] == 'photo':
# Get photo in max size by height (photos are proportionally resized)
height = [ photo['height'] for photo in attachment['photo']['sizes'] ]
for photo in attachment['photo']['sizes']:
if photo['height'] == max(height):
photo_url = photo['url']
attachments.append({'photo': photo_url})
elif attachment['type'] == 'video':
pass
elif attachment['type'] == 'audio':
pass
elif attachment['type'] == 'doc':
pass
elif attachment['type'] == 'page':
pass
elif attachment['type'] == 'note':
pass
elif attachment['type'] == 'pull':
pass
elif attachment['type'] == 'album':
owner_id = str(attachment['album']['owner_id'])
id = str(attachment['album']['id'])
album_url = 'https://vk.com/album' + owner_id + '_' + id
attachments.append({'album': album_url})
elif attachment['type'] == 'market':
pass
elif attachment['type'] == 'market_album':
pass
elif attachment['type'] == 'audio_playlist':
pass
return attachments
def download_file(url: str) -> str:
"""Save file to /tmp. Return file path"""
filename = '/tmp/' + os.path.basename(urllib.parse.urlparse(url).path)
response = requests.get(url, stream=True)
with open(filename, 'wb') as out_file:
shutil.copyfileobj(response.raw, out_file)
del response
return filename
def post_media(file: str) -> str:
"""Upload media file to Mastodon."""
headers = {'Authorization': 'Bearer ' + MASTODON_API_ACCESS_TOKEN}
files = {'file': open(file,'rb')}
response = requests.post(MASTODON_API_URL + '/media', files=files, headers=headers)
logger.info('Post media on Mastodon. Response: ' \
+ str(response.status_code) + ' ' + str(response.text))
# Sleep some seconds to prevent HTTP 429 response.
try:
request_delay = int(os.environ['REQUEST_DELAY'])
except (KeyError, TypeError):
request_delay = 1
time.sleep(request_delay)
return response
def publish_toot(post_text: str, media_ids: list):
"""Post toot on Mastodon.
See: https://docs.joinmastodon.org/methods/statuses/
"""
headers = {'Authorization': 'Bearer ' + MASTODON_API_ACCESS_TOKEN}
params = {'status': post_text, 'media_ids[]': media_ids}
response = requests.post(MASTODON_API_URL + '/statuses', data=params, headers=headers)
post_url = json.loads(response.text)['url']
logger.info('Publish status. Response: ' \
+ str(response.status_code) + ' Status: ' + str(post_url))
return response
def post_toot(post_data: dict) -> str:
"""Upload media files, generate status text for Mastodon post and publish.
"""
post_text = get_vk_post_text(post_data)
vk_post_url = get_vk_post_url(post_data)
attachments = get_vk_post_attachments(post_data)
# Upload attachments!
# Upload only first 4 photos and get media_ids
i = 0
media_ids = []
logger.info('Attachments: %s' % str(attachments))
attachments_count = len(attachments)
if attachments_count > 4:
attachments_count = 4
while i < attachments_count:
if list(attachments[i].keys())[0] == 'photo':
photo_url = attachments[i]['photo']
logger.info('Download image: %s' % photo_url)
tmpfile = download_file(photo_url) # Download file from VK
logger.info('Image saved locally as: %s' % tmpfile)
media_id = json.loads(post_media(tmpfile).text)['id'] # Upload file to Mastodon
media_ids.append(media_id) # Save uploaded media IDs
logger.info('Remove local file: %s' % tmpfile)
os.remove(tmpfile) # Remove local file
i += 1
# Build attachments list
post_attachments = ''
for attachment in attachments:
key = str(list(attachment.keys())[0])
# Example resulting string: 'Album: https://vk.com/album-26788782_284176934'
post_attachments = post_attachments + key.title() + ': ' + attachment[key] + '\n'
# Get status text
text = post_text + '\n\n' + 'Source: ' + vk_post_url + '\n\nAttachments:\n' + post_attachments
# Post toot!
publish_toot(text, media_ids)
def touch_lock_file(file: str, post_id: int):
with open(file, 'w') as lock:
data = json.dumps({'post_id': str(post_id)})
lock.write(data)
def read_lock_file(file: str):
with open(file, 'r') as lock:
data = json.loads(lock.read())
return data['post_id']
def poll():
logger.info('Start polling %s' % datetime.datetime.now().isoformat())
lock_file = './data/post_id.json'
if os.path.exists(lock_file):
prev_post_id = read_lock_file(lock_file)
logger.info('Read last post ID from file: %s' % lock_file)
logger.info('Last post ID: %s' % prev_post_id)
else:
prev_post_id = 0
logger.info('Last post ID: 0')
while True:
post_data = get_vk_group_last_post() # raw data
post_id = post_data['response']['items'][0]['id']
# Don't post duplicates
if int(post_id) == int(prev_post_id):
logger.info('Post with VK ID %s already posted, skipping' % post_id)
else:
# Toot!
logger.info('Toot! VK post ID: %s' % post_id)
post_toot(post_data)
touch_lock_file(lock_file, post_id)
prev_post_id = read_lock_file(lock_file)
try:
poll_time = int(os.environ['POLLING_TIME'])
except (KeyError, TypeError):
poll_time = 300
time.sleep(poll_time)
if __name__ == '__main__':
poll()