pandas的apply函数常见用法总结

2023年4月11日12:07:27

函数介绍

pandas的apply函数通常用于一些复杂的遍历操作(遍历可迭代对象的同时执行一些自定义函数),它的可定制程度高,而且比itterrows、for等操作效率更高,是我非常喜欢而且常用的一个函数。apply的主要参数和对应说明可以查看官网(里面已经说得很详细)
https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.apply.html

常见用法

对DataFrame的每一行执行一些复杂的操作

举一个例子,计算DataFrame中每一条数据中两个人的轨迹相似度,因为和业务相关,里面的子函数不能透露,这里重点呈现apply的用法。

import numpy as np


def calculate_trajectory_similarity(df, trail_dict):
	"""

	:param trail_dict: 
	:param df: 
	:return: 
	"""
	body_threshold = 2
	similarity_threshold = 0.6
	insert_columns = [
		'trajectory_similarity',
	]
	for c in insert_columns:
		try:
			# 
			df.insert(df.shape[1], c, 0)
		except ValueError as v:
			# 
			print(str(v))
	
	def calc_trajectory_similarity(element):
		"""计算XXX"""
		trail01 = trail_dict.get(element['record_id1'])
		trail02 = trail_dict.get(element['record_id2'])
		trail_arr1 = np.array(trail01)
		trail_arr2 = np.array(trail02)
		face_trail01 = trail_arr1[trail_arr1[:, -1] == 'face']
		body_trail01 = trail_arr1[trail_arr1[:, -1] == 'body']
		face_trail02 = trail_arr2[trail_arr2[:, -1] == 'face']
		body_trail02 = trail_arr2[trail_arr2[:, -1] == 'body']
		sub_trail_list = [
			(face_trail01, face_trail02),
			(face_trail01, body_trail02),
			(body_trail01, face_trail02),
			(body_trail01, body_trail02)
		]
		# 
		avg_body_width = (element['body_width1'] + element['body_width2']) / 2
		tr_similarity_list = []
		for s in sub_trail_list:
			coordinate_arr1, coordinate_arr2, time_overlap = get_real_time_coordinates(s[0], s[1])
			if time_overlap > 0:
				sub_similarity = trajectory_similarity(coordinate_arr1, coordinate_arr2, avg_body_width, body_threshold,
				                                       similarity_threshold)
			else:
				sub_similarity = 0
			tr_similarity_list.append((min(len(coordinate_arr1), len(coordinate_arr2)), sub_similarity))
		if len(tr_similarity_list) > 0:
			weights = [i[0] for i in tr_similarity_list]
			if np.sum(weights) > 0:  # 
				tr_similarity = np.sum([w * s for w, s in tr_similarity_list]) / np.sum(weights)
			else:
				tr_similarity = 0
		else:
			tr_similarity = 0
		element['trajectory_similarity'] = tr_similarity
		return element
	
	df = df.apply(calc_trajectory_similarity, axis=1)
	return df

里面最核心的操作是df = df.apply(calc_trajectory_similarity, axis=1),这行代码通过apply调用了calc_trajectory_similarity这个函数,并按照行遍历DataFrame,利用每一行(Series对象)的一些字段信息,计算出轨迹相似度,并存储到DataFrame中。get_real_time_coordinatestrajectory_similarity分别是统计实时点和计算轨迹相似度的自定义函数,在这里可以不用关注。

对Series的每一个元素执行一些复杂操作

举个例子,现有一些原始的轨迹数据,需要进行预处理,可以针对需要处理的DataFrame字段(Series格式)单独进行操作。

import re


def split_to_int(element):
	"""XXX"""
	if element:
		return list(map(int, re.findall(r"[\d]+", element)))
	else:
		element = []
		return element


def split_to_list(element):
	"""XXX"""
	if element:
		element = list(re.findall(r"[\d]+", element))
		element = list(map(convert_time, element))
		return element
	else:
		element = []
		return element


def trail_string_processing(df):
	"""
	
	:param df:
	:return:
	"""
	# 
	pd.set_option('mode.chained_assignment', None)
	trail_name = [
		'trail_left_top_x',
		'trail_left_top_y',
		'trail_right_btm_x',
		'trail_right_btm_y',
	]
	for t in trail_name:
		df.loc[:, t] = df[t].apply(split_to_int)
	return df


def time_string_processing(df):
	"""
	XXX
	:param df:
	:return: 
	"""
	# XXX
	pd.set_option('mode.chained_assignment', None)
	df.loc[:, 'trail_point_time'] = df['trail_point_time'].apply(split_to_list)
	# 
	df.loc[:, 'shot_time'] = df['shot_time'].apply(
		lambda x: x.tz_convert('Asia/Shanghai').tz_localize(None) if x.tz else x)
	return df

在上面的代码中,每一个apply都是针对series执行的操作,apply里面的函数可以是自定义函数,也可以是lambda匿名函数。

对GroupBy对象执行一些复杂操作

举个例子,现有一个DataFrame需要按照某些字段进行分组,然后对分组后的对象执行一些操作,然后重构为新的DataFrame,这时可以通过apply来实现。

import pandas as pd


def merge_key_person_info(df):
	"""
	XXXX
	:param df: 
	:return:
	"""
	
	def group_by_key_person(element):
		element = element.drop_duplicates(subset=['pvid', 'rel_pvid'])
		# 
		key_person_code = element['key_person_code'].iloc[0]
		if key_person_code == 'tag_is_family':
			max_members_num = 6
		else:
			max_members_num = 11
		key_person_num = len(element['pvid'].iloc[0].split(','))
		num_k = max_members_num - key_person_num
		num_k = num_k if num_k > 1 else 1
		element = element.sort_values(by=['relation_score'], ascending=False).iloc[:num_k, :]
		# 
		key_person_score = list(set(element['key_person_score'].values))
		rel_pvid_list = list(element['rel_pvid'].values)
		relation_code_list = list(element['relation_code'].values)
		relation_score_list = list(element['relation_score'].values)
		start_time_list = list(element['relation_info_start_time'].values)
		end_time_list = list(element['relation_info_end_time'].values)
		series_dict = {
			'pvid': element['pvid'].iloc[0],
			'corp_id': element['corp_id'].iloc[0],
			'key_person_code': element['key_person_code'].iloc[0],
			'key_person_score': key_person_score,
			'rel_pvid': rel_pvid_list,
			'relation_code': relation_code_list,
			'relation_score': relation_score_list,
			'relation_info_start_time': start_time_list,
			'relation_info_end_time': end_time_list
		}
		result = pd.Series(series_dict)
		return result
	
	# 
	group_by_obj = df.groupby(by=['pvid', 'corp_id', 'key_person_code'])
	group_df = group_by_obj.apply(group_by_key_person).reset_index(drop=True)
	return group_df

numpy的替代写法

有时候为了提升效率,一些涉及到大量数值计算的apply可以使用numpy的.apply_along_axis替代。

def calculate_speed_and_angle_similarity(parameters_df):
    """
    
    :param parameters_df: 
    :return: 
    """
    try:
        # 
        parameters_df.insert(parameters_df.shape[1], 'angle_similarity', 0)
        parameters_df.insert(parameters_df.shape[1], 'speed_similarity', 0)
    except ValueError as v:
        #
        logger = my_logger()
        logger.info(str(v))

    def calc_angle_speed_similarity(element):
        """XXXX"""
        angle1 = element[35]
        angle2 = element[83]

        moving_speed1 = element[43]
        moving_speed2 = element[91]

        # 
        angle_difference = abs(angle1 - angle2)
        if angle_difference >= 90:  # 
            angle_similarity = 0
        else:
            angle_similarity = np.cos(abs(angle1 - angle2) / 180 * np.pi)
        element[102] = angle_similarity

        # 
        slower_speed = min(moving_speed1, moving_speed2)
        faster_speed = max(moving_speed1, moving_speed2)
        speed_similarity = slower_speed / faster_speed
        element[103] = speed_similarity
        return element

    arr = parameters_df.values
    new_arr = np.apply_along_axis(calc_angle_speed_similarity, axis=1, arr=arr)
    parameters_df = pd.DataFrame(new_arr, columns=parameters_df.columns)
    return parameters_df

按照上述写法,虽然可以在一定程度上提升运行速度,但由于ndarray不支持字符串索引,对字段的操作只能按照序号来进行,很容易出错,代码可读性也比较差,不太推荐在复杂函数中使用,简单的计算用np.apply_along_axis会比较适合。

上面的代码都是一些模块的片段,只是用来展示apply的用法,因此无法跑通,请多包涵。为了信息安全,所有注释和细节代码都删除了。

  • 作者:Ray_awakepure
  • 原文链接:https://blog.csdn.net/Ray_awakepure/article/details/121778153
    更新时间:2023年4月11日12:07:27 ,共 5746 字。