1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105
| from qcloud_cos_v5 import CosConfig from qcloud_cos_v5 import CosS3Client from qcloud_cos_v5 import CosServiceError from operator import itemgetter import os import sys import datetime import logging region = u'ap-guangzhou' result_bucket = u'destmr' logging.basicConfig(level=logging.INFO, stream=sys.stdout) logger = logging.getLogger() logger.setLevel(level=logging.INFO) def delete_file_folder(src): if os.path.isfile(src): try: os.remove(src) except: pass elif os.path.isdir(src): for item in os.listdir(src): itemsrc = os.path.join(src, item) delete_file_folder(itemsrc) try: os.rmdir(src) except: pass def download_file(cos_client, bucket, key, download_path): logger.info("Get from [%s] to download file [%s]" % (bucket, key)) try: response = cos_client.get_object(Bucket=bucket, Key=key, ) response['Body'].get_stream_to_file(download_path) except CosServiceError as e: print(e.get_error_code()) print(e.get_error_msg()) return -1 return 0 def upload_file(cos_client, bucket, key, local_file_path): logger.info("Start to upload file to cos") try: response = cos_client.put_object_from_local_file( Bucket=bucket, LocalFilePath=local_file_path, Key='{}'.format(key)) except CosServiceError as e: print(e.get_error_code()) print(e.get_error_msg()) return -1 logger.info("Upload data map file [%s] Success" % key) return 0 def qcloud_reducer(cos_client, bucket, key, result_bucket, result_key): word2count = {} src_file_path = u'/tmp/' + key.split('/')[-1] result_file_path = u'/tmp/' + u'result_' + key.split('/')[-1] download_ret = download_file(cos_client, bucket, key, src_file_path) if download_ret == 0: map_file = open(src_file_path, 'r') result_file = open(result_file_path, 'w') for line in map_file: line = line.strip() word, count = line.split('\t', 1) try: count = int(count) word2count[word] = word2count.get(word, 0) + count except ValueError: logger.error("error value: %s, current line: %s" % (ValueError, line)) continue map_file.close() delete_file_folder(src_file_path) sorted_word2count = sorted(word2count.items(), key=itemgetter(1))[::-1] for wordcount in sorted_word2count: res = '%s\t%s' % (wordcount[0], wordcount[1]) result_file.write(res) result_file.write('\n') result_file.close() upload_ret = upload_file(cos_client, result_bucket, result_key, result_file_path) delete_file_folder(result_file_path) return upload_ret def reduce_caller(event, context, cos_client): appid = event['Records'][0]['cos']['cosBucket']['appid'] bucket = event['Records'][0]['cos']['cosBucket']['name'] + '-' + appid key = event['Records'][0]['cos']['cosObject']['key'] key = key.replace('/' + str(appid) + '/' + event['Records'][0]['cos']['cosBucket']['name'] + '/', '', 1) logger.info("Key is " + key) res_bucket = result_bucket + '-' + appid result_key = '/' + 'result_' + key.split('/')[-1] return qcloud_reducer(cos_client, bucket, key, res_bucket, result_key) def main_handler(event, context): logger.info("start main handler") if "Records" not in event.keys(): return {"errorMsg": "event is not come from cos"} secret_id = "SecretId" secret_key = "SecretKey" config = CosConfig(Region=region, SecretId=secret_id, SecretKey=secret_key, ) cos_client = CosS3Client(config) start_time = datetime.datetime.now() res = reduce_caller(event, context, cos_client) end_time = datetime.datetime.now() print("data reducing duration: " + str((end_time - start_time).microseconds / 1000) + "ms") if res == 0: return "Data reducing SUCCESS" else: return "Data reducing FAILED"
|