__version__ = '0.2.1' import os import sys import json import time import datetime import logging import shutil import urllib.parse import requests MASTODON_API_URL = os.environ['MASTODON_API_URL'] MASTODON_API_ACCESS_TOKEN = os.environ['MASTODON_API_ACCESS_TOKEN'] VK_API_URL = os.environ['VK_API_URL'] VK_API_VERSION = os.environ['VK_API_VERSION'] VK_API_ACCESS_TOKEN = os.environ['VK_API_ACCESS_TOKEN'] VK_GROUP_DOMAIN = os.environ['VK_GROUP_DOMAIN'] # Set up logger logger = logging.getLogger('vk_mastodon_bridge') logger.setLevel(logging.INFO) handler = logging.StreamHandler(stream=sys.stdout) handler.setFormatter(logging.Formatter(fmt = '[%(asctime)s: %(levelname)s] %(message)s')) logger.addHandler(handler) def get_vk_group_last_post(): """Return dict with VK group last post data.""" return json.loads(requests.get(VK_API_URL + '/wall.get' \ + '?v=' + VK_API_VERSION \ + '&access_token=' + VK_API_ACCESS_TOKEN \ + '&domain=' + VK_GROUP_DOMAIN \ + '&count=1').text) def get_vk_post_text(post_data: dict) -> str: """See: https://dev.vk.com/reference/objects/post""" return post_data['response']['items'][0]['text'] def get_vk_post_url(post_data: dict) -> str: """Return link to original post on vk.com See: https://dev.vk.com/reference/objects/post """ wall_id = str(post_data['response']['items'][0]['owner_id']) post_id = str(post_data['response']['items'][0]['id']) return 'https://vk.com/wall' + wall_id + '_' + post_id def get_vk_post_attachments(post_data: dict) -> list: """Process attachments. See attachments at https://dev.vk.com/method/wall.post Return list of dicts with following structure:: [{'photo': 'url'}, {'album': 'url'}] """ attachments = [] raw_attachments = post_data['response']['items'][0]['attachments'] for attachment in raw_attachments: if attachment['type'] == 'photo': # Get photo in max size by height (photos are proportionally resized) height = [ photo['height'] for photo in attachment['photo']['sizes'] ] for photo in attachment['photo']['sizes']: if photo['height'] == max(height): photo_url = photo['url'] attachments.append({'photo': photo_url}) elif attachment['type'] == 'video': pass elif attachment['type'] == 'audio': pass elif attachment['type'] == 'doc': pass elif attachment['type'] == 'page': pass elif attachment['type'] == 'note': pass elif attachment['type'] == 'pull': pass elif attachment['type'] == 'album': owner_id = str(attachment['album']['owner_id']) id = str(attachment['album']['id']) album_url = 'https://vk.com/album' + owner_id + '_' + id attachments.append({'album': album_url}) elif attachment['type'] == 'market': pass elif attachment['type'] == 'market_album': pass elif attachment['type'] == 'audio_playlist': pass return attachments def download_file(url: str) -> str: """Save file to /tmp. Return file path""" filename = '/tmp/' + os.path.basename(urllib.parse.urlparse(url).path) response = requests.get(url, stream=True) with open(filename, 'wb') as out_file: shutil.copyfileobj(response.raw, out_file) del response return filename def post_media(file: str) -> str: """Upload media file to Mastodon.""" headers = {'Authorization': 'Bearer ' + MASTODON_API_ACCESS_TOKEN} files = {'file': open(file,'rb')} response = requests.post(MASTODON_API_URL + '/media', files=files, headers=headers) logger.info('Post media on Mastodon. Response: ' \ + str(response.status_code) + ' ' + str(response.text)) # Sleep some seconds to prevent HTTP 429 response. try: request_delay = int(os.environ['REQUEST_DELAY']) except (KeyError, TypeError): request_delay = 1 time.sleep(request_delay) return response def publish_toot(post_text: str, media_ids: list): """Post toot on Mastodon. See: https://docs.joinmastodon.org/methods/statuses/ """ headers = {'Authorization': 'Bearer ' + MASTODON_API_ACCESS_TOKEN} params = {'status': post_text, 'media_ids[]': media_ids} response = requests.post(MASTODON_API_URL + '/statuses', data=params, headers=headers) post_url = json.loads(response.text)['url'] logger.info('Publish status. Response: ' \ + str(response.status_code) + ' Status: ' + str(post_url)) return response def post_toot(post_data: dict) -> str: """Upload media files, generate status text for Mastodon post and publish. """ post_text = get_vk_post_text(post_data) vk_post_url = get_vk_post_url(post_data) attachments = get_vk_post_attachments(post_data) # Upload attachments! # Upload only first 4 photos and get media_ids i = 0 media_ids = [] logger.info('Attachments: %s' % str(attachments)) attachments_count = len(attachments) if attachments_count > 4: attachments_count = 4 while i < attachments_count: if list(attachments[i].keys())[0] == 'photo': photo_url = attachments[i]['photo'] logger.info('Download image: %s' % photo_url) tmpfile = download_file(photo_url) # Download file from VK logger.info('Image saved locally as: %s' % tmpfile) media_id = json.loads(post_media(tmpfile).text)['id'] # Upload file to Mastodon media_ids.append(media_id) # Save uploaded media IDs logger.info('Remove local file: %s' % tmpfile) os.remove(tmpfile) # Remove local file i += 1 # Build attachments list post_attachments = '' for attachment in attachments: key = str(list(attachment.keys())[0]) # Example resulting string: 'Album: https://vk.com/album-26788782_284176934' post_attachments = post_attachments + key.title() + ': ' + attachment[key] + '\n' # Get status text text = post_text + '\n\n' + 'Source: ' + vk_post_url + '\n\nAttachments:\n' + post_attachments # Post toot! publish_toot(text, media_ids) def touch_lock_file(file: str, post_id: int): with open(file, 'w') as lock: data = json.dumps({'post_id': str(post_id)}) lock.write(data) def read_lock_file(file: str): with open(file, 'r') as lock: data = json.loads(lock.read()) return data['post_id'] def poll(): logger.info('Start polling %s' % datetime.datetime.now().isoformat()) lock_file = './data/post_id.json' if os.path.exists(lock_file): prev_post_id = read_lock_file(lock_file) logger.info('Read last post ID from file: %s' % lock_file) logger.info('Last post ID: %s' % prev_post_id) else: prev_post_id = 0 logger.info('Last post ID: 0') while True: post_data = get_vk_group_last_post() # raw data post_id = post_data['response']['items'][0]['id'] # Don't post duplicates if int(post_id) == int(prev_post_id): logger.info('Post with VK ID %s already posted, skipping' % post_id) else: # Toot! logger.info('Toot! VK post ID: %s' % post_id) post_toot(post_data) touch_lock_file(lock_file, post_id) prev_post_id = read_lock_file(lock_file) try: poll_time = int(os.environ['POLLING_TIME']) except (KeyError, TypeError): poll_time = 300 time.sleep(poll_time) if __name__ == '__main__': poll()